1#![allow(unused_imports)]
18
19use std::io;
20use std::io::prelude::*;
21use std::io::ErrorKind;
22use std::path::Path;
23use std::process::{Child, Command, Stdio};
24
25use anyhow::{bail, Context, Result};
26#[allow(unused_imports)]
27use log::{debug, error, info, trace, warn};
28
29use crate::flist::{read_file_list, FileList};
30use crate::mux::DemuxRead;
31use crate::varint::{ReadVarint, WriteVarint};
32use crate::{Options, ServerStatistics};
33
34const MY_PROTOCOL_VERSION: i32 = 27;
35
36pub(crate) struct Connection {
41 rv: ReadVarint,
42 wv: WriteVarint,
43
44 protocol_version: i32,
46
47 #[allow(unused)]
48 salt: i32,
49
50 child: Child,
51
52 #[allow(unused)]
53 options: Options,
54}
55
56impl Connection {
57 pub(crate) fn handshake(
61 r: Box<dyn Read>,
62 w: Box<dyn Write>,
63 child: Child,
64 options: Options,
65 ) -> Result<Connection> {
66 let mut wv = WriteVarint::new(w);
67 let mut rv = ReadVarint::new(r);
68
69 wv.write_i32(MY_PROTOCOL_VERSION)?;
70 let remote_protocol_version = rv.read_i32().unwrap();
71 if remote_protocol_version < MY_PROTOCOL_VERSION {
72 bail!(
73 "server protocol version {} is too old",
74 remote_protocol_version
75 );
76 }
77 let salt = rv.read_i32().unwrap();
81 debug!(
82 "Connected to server version {}, salt {:#x}",
83 remote_protocol_version, salt
84 );
85 let protocol_version = std::cmp::min(MY_PROTOCOL_VERSION, remote_protocol_version);
86 debug!("Agreed protocol version {}", protocol_version);
87
88 let rv = ReadVarint::new(Box::new(DemuxRead::new(rv.take())));
92
93 Ok(Connection {
94 rv,
95 wv,
96 protocol_version,
97 salt,
98 child,
99 options,
100 })
101 }
102
103 pub(crate) fn list_files(mut self) -> Result<(FileList, ServerStatistics)> {
108 let max_phase = 2;
111
112 self.send_exclusions()?;
114 let mut file_list = read_file_list(&mut self.rv)?;
115 crate::flist::sort(&mut file_list);
116 if self.protocol_version < 30 {
120 let io_error_count = self
121 .rv
122 .read_i32()
123 .context("Failed to read server error count")?;
124 if io_error_count > 0 {
125 warn!("Server reports {} IO errors", io_error_count);
127 }
128 }
129
130 for phase in 1..=max_phase {
131 debug!("Start phase {}", phase);
132
133 self.wv
134 .write_i32(-1)
135 .context("Failed to send phase transition")?; if file_list.is_empty() {
139 info!("Server returned no files, so we're done");
140 self.shutdown()?;
141 return Ok((file_list, ServerStatistics::default()));
142 }
143
144 assert_eq!(
145 self.rv
146 .read_i32()
147 .context("Failed to read phase transition")?,
148 -1
149 );
150 }
151
152 debug!("Send end of sequence");
153 self.wv
154 .write_i32(-1)
155 .context("Failed to send end-of-sequence marker")?;
156 let server_stats = self
158 .read_server_statistics()
159 .context("Failed to read server statistics")?;
160 info!("{:#?}", server_stats);
161
162 self.shutdown()?;
164 Ok((file_list, server_stats))
165 }
166
167 fn shutdown(self) -> Result<()> {
172 let Connection {
173 rv,
174 wv,
175 protocol_version: _,
176 salt: _,
177 mut child,
178 options: _,
179 } = self;
180
181 rv.check_for_eof()?;
182 drop(wv);
183
184 let child_result = child.wait()?;
188 info!("Child process exited: {}", child_result);
189
190 Ok(())
191 }
192
193 fn send_exclusions(&mut self) -> Result<()> {
194 self.wv
195 .write_i32(0)
196 .context("Failed to send exclusion list")
197 }
198
199 fn read_server_statistics(&mut self) -> Result<ServerStatistics> {
200 Ok(ServerStatistics {
201 total_bytes_read: self.rv.read_i64()?,
202 total_bytes_written: self.rv.read_i64()?,
203 total_file_size: self.rv.read_i64()?,
204 flist_build_time: if self.protocol_version >= 29 {
205 Some(self.rv.read_i64()?)
206 } else {
207 None
208 },
209 flist_xfer_time: if self.protocol_version >= 29 {
210 Some(self.rv.read_i64()?)
211 } else {
212 None
213 },
214 })
215 }
216}