rsyn/
connection.rs

1// Copyright 2020 Google LLC
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//      http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! A connection to an rsync server.
16
17#![allow(unused_imports)]
18
19use std::io;
20use std::io::prelude::*;
21use std::io::ErrorKind;
22use std::path::Path;
23use std::process::{Child, Command, Stdio};
24
25use anyhow::{bail, Context, Result};
26#[allow(unused_imports)]
27use log::{debug, error, info, trace, warn};
28
29use crate::flist::{read_file_list, FileList};
30use crate::mux::DemuxRead;
31use crate::varint::{ReadVarint, WriteVarint};
32use crate::{Options, ServerStatistics};
33
34const MY_PROTOCOL_VERSION: i32 = 27;
35
36/// Connection to an rsync server.
37///
38/// Due to the protocol definition, only one transfer (list, send, or receive)
39/// can be done per connection.
40pub(crate) struct Connection {
41    rv: ReadVarint,
42    wv: WriteVarint,
43
44    /// Mutually-agreed rsync protocol version number.
45    protocol_version: i32,
46
47    #[allow(unused)]
48    salt: i32,
49
50    child: Child,
51
52    #[allow(unused)]
53    options: Options,
54}
55
56impl Connection {
57    /// Start a new connection, by doing the rsync handshake protocol.
58    ///
59    /// The public interface is through `Client`.
60    pub(crate) fn handshake(
61        r: Box<dyn Read>,
62        w: Box<dyn Write>,
63        child: Child,
64        options: Options,
65    ) -> Result<Connection> {
66        let mut wv = WriteVarint::new(w);
67        let mut rv = ReadVarint::new(r);
68
69        wv.write_i32(MY_PROTOCOL_VERSION)?;
70        let remote_protocol_version = rv.read_i32().unwrap();
71        if remote_protocol_version < MY_PROTOCOL_VERSION {
72            bail!(
73                "server protocol version {} is too old",
74                remote_protocol_version
75            );
76        }
77        // The server and client agree to use the minimum supported version, which will now be
78        // ours.
79
80        let salt = rv.read_i32().unwrap();
81        debug!(
82            "Connected to server version {}, salt {:#x}",
83            remote_protocol_version, salt
84        );
85        let protocol_version = std::cmp::min(MY_PROTOCOL_VERSION, remote_protocol_version);
86        debug!("Agreed protocol version {}", protocol_version);
87
88        // Server-to-client is multiplexed; client-to-server is not.
89        // Pull back the underlying stream and wrap it in a demuxed varint
90        // encoder.
91        let rv = ReadVarint::new(Box::new(DemuxRead::new(rv.take())));
92
93        Ok(Connection {
94            rv,
95            wv,
96            protocol_version,
97            salt,
98            child,
99            options,
100        })
101    }
102
103    /// Lists files in the target directory on the server.
104    ///
105    /// The file list is in the sorted order defined by the protocol, which
106    /// is strcmp on the raw bytes of the names.
107    pub(crate) fn list_files(mut self) -> Result<(FileList, ServerStatistics)> {
108        // Analogous to rsync/receiver.c recv_files().
109        // let max_phase = if self.protocol_version >= 29 { 2 } else { 1 };
110        let max_phase = 2;
111
112        // send exclusion list length of 0
113        self.send_exclusions()?;
114        let mut file_list = read_file_list(&mut self.rv)?;
115        crate::flist::sort(&mut file_list);
116        // TODO: With -o, get uid list.
117        // TODO: With -g, get gid list.
118
119        if self.protocol_version < 30 {
120            let io_error_count = self
121                .rv
122                .read_i32()
123                .context("Failed to read server error count")?;
124            if io_error_count > 0 {
125                // TODO: Somehow make this, and other soft errors, observable to the API client.
126                warn!("Server reports {} IO errors", io_error_count);
127            }
128        }
129
130        for phase in 1..=max_phase {
131            debug!("Start phase {}", phase);
132
133            self.wv
134                .write_i32(-1)
135                .context("Failed to send phase transition")?; // end of phase 1
136
137            // Server stops here if there were no files.
138            if file_list.is_empty() {
139                info!("Server returned no files, so we're done");
140                self.shutdown()?;
141                return Ok((file_list, ServerStatistics::default()));
142            }
143
144            assert_eq!(
145                self.rv
146                    .read_i32()
147                    .context("Failed to read phase transition")?,
148                -1
149            );
150        }
151
152        debug!("Send end of sequence");
153        self.wv
154            .write_i32(-1)
155            .context("Failed to send end-of-sequence marker")?;
156        // TODO: In later versions (which?) read an end-of-sequence marker?
157        let server_stats = self
158            .read_server_statistics()
159            .context("Failed to read server statistics")?;
160        info!("{:#?}", server_stats);
161
162        // TODO: In later versions, send a final -1 marker.
163        self.shutdown()?;
164        Ok((file_list, server_stats))
165    }
166
167    /// Shut down this connection, consuming the object.
168    ///
169    /// This isn't the drop method, because it only makes sense to do after
170    /// the protocol has reached the natural end.
171    fn shutdown(self) -> Result<()> {
172        let Connection {
173            rv,
174            wv,
175            protocol_version: _,
176            salt: _,
177            mut child,
178            options: _,
179        } = self;
180
181        rv.check_for_eof()?;
182        drop(wv);
183
184        // TODO: Should this be returned, somehow?
185        // TODO: Should we timeout after a while?
186        // TODO: Map rsync return codes to messages.
187        let child_result = child.wait()?;
188        info!("Child process exited: {}", child_result);
189
190        Ok(())
191    }
192
193    fn send_exclusions(&mut self) -> Result<()> {
194        self.wv
195            .write_i32(0)
196            .context("Failed to send exclusion list")
197    }
198
199    fn read_server_statistics(&mut self) -> Result<ServerStatistics> {
200        Ok(ServerStatistics {
201            total_bytes_read: self.rv.read_i64()?,
202            total_bytes_written: self.rv.read_i64()?,
203            total_file_size: self.rv.read_i64()?,
204            flist_build_time: if self.protocol_version >= 29 {
205                Some(self.rv.read_i64()?)
206            } else {
207                None
208            },
209            flist_xfer_time: if self.protocol_version >= 29 {
210                Some(self.rv.read_i64()?)
211            } else {
212                None
213            },
214        })
215    }
216}