marlin_binary_transfer/adapters/
blocking.rs1use std::io::{Read, Write};
46use std::time::Instant;
47
48use crate::adapters::common::resolve_chunk_size;
49use crate::file_transfer::{Compression, FileEvent, FileTransfer};
50use crate::session::Session;
51
52pub use crate::adapters::common::{UploadError, UploadOptions, UploadStats};
53
54pub fn upload<T: Read + Write + ?Sized, S: Read>(
58 transport: &mut T,
59 mut src: S,
60 options: UploadOptions,
61) -> Result<UploadStats, UploadError> {
62 transport.write_all(b"M28B1\n")?;
64
65 let mut session = Session::new();
66 let now = Instant::now();
67 session.connect(now);
68
69 drive_session_until_synced(transport, &mut session)?;
70
71 let device_max = session.max_block_size().unwrap_or(0);
74
75 let mut ft = FileTransfer::new(&mut session);
76 ft.query(options.compression.clone(), Instant::now());
77 let negotiated = drive_until_negotiated(transport, &mut ft)?;
78
79 ft.open(&options.dest_filename, options.dummy, Instant::now());
80 drive_until_event(transport, &mut ft, |e| matches!(e, FileEvent::Opened))?;
81
82 let mut stats = UploadStats {
83 compression: negotiated.clone(),
84 ..UploadStats::default()
85 };
86
87 let chunk_size = resolve_chunk_size(options.chunk_size, device_max);
88 let mut source_bytes = Vec::new();
92 src.read_to_end(&mut source_bytes)?;
93 stats.source_bytes = source_bytes.len() as u64;
94
95 let payload: Vec<u8> = match &negotiated {
96 Compression::None => source_bytes,
97 Compression::Heatshrink { window, lookahead } => {
98 #[cfg(feature = "heatshrink")]
99 {
100 crate::compression::compress(&source_bytes, *window, *lookahead)?
101 }
102 #[cfg(not(feature = "heatshrink"))]
103 {
104 let _ = (window, lookahead);
105 return Err(UploadError::CompressionFeatureDisabled);
106 }
107 }
108 Compression::Auto => unreachable!("FileTransfer resolves Auto during query"),
109 };
110
111 for chunk in payload.chunks(chunk_size) {
112 ft.write(chunk, Instant::now());
113 drive_until_event(transport, &mut ft, |e| matches!(e, FileEvent::WriteAcked))?;
114 stats.bytes_sent += chunk.len() as u64;
115 stats.chunks_sent += 1;
116 }
117
118 ft.close(Instant::now());
119 drive_until_event(transport, &mut ft, |e| matches!(e, FileEvent::Closed))?;
120
121 drop(ft);
126 session.send(0, 2, &[], Instant::now());
127 drive_session_until_idle(transport, &mut session)?;
128
129 Ok(stats)
130}
131
132fn drive_session_until_idle<T: Read + Write + ?Sized>(
133 transport: &mut T,
134 session: &mut Session,
135) -> Result<(), UploadError> {
136 use crate::file_transfer::FileError;
137 use crate::session::Event;
138 let mut buf = [0u8; 1024];
139 for _ in 0..200 {
140 while let Some(out) = session.poll_outbound() {
141 transport.write_all(&out)?;
142 }
143 let n = match transport.read(&mut buf) {
144 Ok(n) => n,
145 Err(e) if e.kind() == std::io::ErrorKind::TimedOut => 0,
146 Err(e) => return Err(UploadError::Io(e)),
147 };
148 if n > 0 {
149 session.feed(&buf[..n], Instant::now());
150 }
151 while let Some(evt) = session.poll_event() {
152 match evt {
153 Event::Ack(_) => return Ok(()),
154 Event::FatalError => {
155 return Err(UploadError::Transfer(FileError::SessionFatalError));
156 }
157 Event::Timeout { .. } => {
158 return Err(UploadError::Transfer(FileError::SessionTimeout));
159 }
160 Event::OutOfSync { expected, got } => {
161 return Err(UploadError::Transfer(FileError::SessionOutOfSync {
162 expected,
163 got,
164 }));
165 }
166 _ => {}
167 }
168 }
169 session.tick(Instant::now());
170 }
171 Err(UploadError::Stalled("control close not acked"))
172}
173
174fn drive_session_until_synced<T: Read + Write + ?Sized>(
175 transport: &mut T,
176 session: &mut Session,
177) -> Result<(), UploadError> {
178 use crate::file_transfer::FileError;
179 use crate::session::Event;
180 let mut buf = [0u8; 1024];
181 for _ in 0..200 {
182 while let Some(out) = session.poll_outbound() {
183 transport.write_all(&out)?;
184 }
185 let n = match transport.read(&mut buf) {
186 Ok(n) => n,
187 Err(e) if e.kind() == std::io::ErrorKind::TimedOut => 0,
188 Err(e) => return Err(UploadError::Io(e)),
189 };
190 if n > 0 {
191 session.feed(&buf[..n], Instant::now());
192 }
193 while let Some(evt) = session.poll_event() {
194 match evt {
195 Event::Synced { .. } => return Ok(()),
196 Event::FatalError => {
197 return Err(UploadError::Transfer(FileError::SessionFatalError));
198 }
199 Event::Timeout { .. } => {
200 return Err(UploadError::Transfer(FileError::SessionTimeout));
201 }
202 Event::OutOfSync { expected, got } => {
203 return Err(UploadError::Transfer(FileError::SessionOutOfSync {
204 expected,
205 got,
206 }));
207 }
208 _ => {}
209 }
210 }
211 session.tick(Instant::now());
212 }
213 Err(UploadError::HandshakeFailed)
214}
215
216fn drive_until_negotiated<T: Read + Write + ?Sized>(
217 transport: &mut T,
218 ft: &mut FileTransfer<'_>,
219) -> Result<Compression, UploadError> {
220 let mut buf = [0u8; 1024];
221 for _ in 0..200 {
222 while let Some(out) = ft.poll_outbound() {
223 transport.write_all(&out)?;
224 }
225 let n = match transport.read(&mut buf) {
226 Ok(n) => n,
227 Err(e) if e.kind() == std::io::ErrorKind::TimedOut => 0,
228 Err(e) => return Err(UploadError::Io(e)),
229 };
230 if n > 0 {
231 ft.feed(&buf[..n], Instant::now());
232 }
233 while let Some(evt) = ft.poll() {
234 match evt {
235 FileEvent::Negotiated { compression, .. } => return Ok(compression),
236 FileEvent::Failed(err) => return Err(UploadError::Transfer(err)),
237 _ => {}
238 }
239 }
240 ft.tick(Instant::now());
241 }
242 Err(UploadError::Stalled("negotiation did not complete"))
243}
244
245fn drive_until_event<T: Read + Write + ?Sized, F: Fn(&FileEvent) -> bool>(
246 transport: &mut T,
247 ft: &mut FileTransfer<'_>,
248 pred: F,
249) -> Result<(), UploadError> {
250 let mut buf = [0u8; 1024];
251 for _ in 0..200 {
252 while let Some(out) = ft.poll_outbound() {
253 transport.write_all(&out)?;
254 }
255 let n = match transport.read(&mut buf) {
256 Ok(n) => n,
257 Err(e) if e.kind() == std::io::ErrorKind::TimedOut => 0,
258 Err(e) => return Err(UploadError::Io(e)),
259 };
260 if n > 0 {
261 ft.feed(&buf[..n], Instant::now());
262 }
263 while let Some(evt) = ft.poll() {
264 if let FileEvent::Failed(err) = &evt {
265 return Err(UploadError::Transfer(err.clone()));
266 }
267 if pred(&evt) {
268 return Ok(());
269 }
270 }
271 ft.tick(Instant::now());
272 }
273 Err(UploadError::Stalled("event did not arrive in time"))
274}