marlin_binary_transfer/adapters/
blocking.rs1use std::io::{Read, Write};
46use std::time::Instant;
47
48use crate::adapters::common::resolve_chunk_size;
49use crate::file_transfer::{Compression, FileEvent, FileTransfer};
50use crate::session::Session;
51
52pub use crate::adapters::common::{
53 Progress, ProgressCallback, UploadError, UploadOptions, UploadStats,
54};
55
56pub fn upload<T: Read + Write + ?Sized, S: Read>(
60 transport: &mut T,
61 mut src: S,
62 mut options: UploadOptions,
63) -> Result<UploadStats, UploadError> {
64 transport.write_all(b"M28B1\n")?;
66
67 let mut session = Session::new();
68 let now = Instant::now();
69 session.connect(now);
70
71 drive_session_until_synced(transport, &mut session)?;
72
73 let device_max = session.max_block_size().unwrap_or(0);
76
77 let mut ft = FileTransfer::new(&mut session);
78 ft.query(options.compression.clone(), Instant::now());
79 let negotiated = drive_until_negotiated(transport, &mut ft)?;
80
81 ft.open(&options.dest_filename, options.dummy, Instant::now());
82 drive_until_event(transport, &mut ft, |e| matches!(e, FileEvent::Opened))?;
83
84 let mut stats = UploadStats {
85 compression: negotiated.clone(),
86 ..UploadStats::default()
87 };
88
89 let chunk_size = resolve_chunk_size(options.chunk_size, device_max);
90 let mut source_bytes = Vec::new();
94 src.read_to_end(&mut source_bytes)?;
95 stats.source_bytes = source_bytes.len() as u64;
96
97 let payload: Vec<u8> = match &negotiated {
98 Compression::None => source_bytes,
99 Compression::Heatshrink { window, lookahead } => {
100 #[cfg(feature = "heatshrink")]
101 {
102 crate::compression::compress(&source_bytes, *window, *lookahead)?
103 }
104 #[cfg(not(feature = "heatshrink"))]
105 {
106 let _ = (window, lookahead);
107 return Err(UploadError::CompressionFeatureDisabled);
108 }
109 }
110 Compression::Auto => unreachable!("FileTransfer resolves Auto during query"),
111 };
112
113 for chunk in payload.chunks(chunk_size) {
114 ft.write(chunk, Instant::now());
115 drive_until_event(transport, &mut ft, |e| matches!(e, FileEvent::WriteAcked))?;
116 stats.bytes_sent += chunk.len() as u64;
117 stats.chunks_sent += 1;
118 if let Some(cb) = options.progress.as_mut() {
119 cb(Progress {
120 bytes_sent: stats.bytes_sent,
121 chunks_sent: stats.chunks_sent,
122 source_bytes: stats.source_bytes,
123 });
124 }
125 }
126
127 ft.close(Instant::now());
128 drive_until_event(transport, &mut ft, |e| matches!(e, FileEvent::Closed))?;
129
130 drop(ft);
135 session.send(0, 2, &[], Instant::now());
136 drive_session_until_idle(transport, &mut session)?;
137
138 Ok(stats)
139}
140
141fn drive_session_until_idle<T: Read + Write + ?Sized>(
142 transport: &mut T,
143 session: &mut Session,
144) -> Result<(), UploadError> {
145 use crate::file_transfer::FileError;
146 use crate::session::Event;
147 let mut buf = [0u8; 1024];
148 for _ in 0..200 {
149 while let Some(out) = session.poll_outbound() {
150 transport.write_all(&out)?;
151 }
152 let n = match transport.read(&mut buf) {
153 Ok(n) => n,
154 Err(e) if e.kind() == std::io::ErrorKind::TimedOut => 0,
155 Err(e) => return Err(UploadError::Io(e)),
156 };
157 if n > 0 {
158 session.feed(&buf[..n], Instant::now());
159 }
160 while let Some(evt) = session.poll_event() {
161 match evt {
162 Event::Ack(_) => return Ok(()),
163 Event::FatalError => {
164 return Err(UploadError::Transfer(FileError::SessionFatalError));
165 }
166 Event::Timeout { .. } => {
167 return Err(UploadError::Transfer(FileError::SessionTimeout));
168 }
169 Event::OutOfSync { expected, got } => {
170 return Err(UploadError::Transfer(FileError::SessionOutOfSync {
171 expected,
172 got,
173 }));
174 }
175 _ => {}
176 }
177 }
178 session.tick(Instant::now());
179 }
180 Err(UploadError::Stalled("control close not acked"))
181}
182
183fn drive_session_until_synced<T: Read + Write + ?Sized>(
184 transport: &mut T,
185 session: &mut Session,
186) -> Result<(), UploadError> {
187 use crate::file_transfer::FileError;
188 use crate::session::Event;
189 let mut buf = [0u8; 1024];
190 for _ in 0..200 {
191 while let Some(out) = session.poll_outbound() {
192 transport.write_all(&out)?;
193 }
194 let n = match transport.read(&mut buf) {
195 Ok(n) => n,
196 Err(e) if e.kind() == std::io::ErrorKind::TimedOut => 0,
197 Err(e) => return Err(UploadError::Io(e)),
198 };
199 if n > 0 {
200 session.feed(&buf[..n], Instant::now());
201 }
202 while let Some(evt) = session.poll_event() {
203 match evt {
204 Event::Synced { .. } => return Ok(()),
205 Event::FatalError => {
206 return Err(UploadError::Transfer(FileError::SessionFatalError));
207 }
208 Event::Timeout { .. } => {
209 return Err(UploadError::Transfer(FileError::SessionTimeout));
210 }
211 Event::OutOfSync { expected, got } => {
212 return Err(UploadError::Transfer(FileError::SessionOutOfSync {
213 expected,
214 got,
215 }));
216 }
217 _ => {}
218 }
219 }
220 session.tick(Instant::now());
221 }
222 Err(UploadError::HandshakeFailed)
223}
224
225fn drive_until_negotiated<T: Read + Write + ?Sized>(
226 transport: &mut T,
227 ft: &mut FileTransfer<'_>,
228) -> Result<Compression, UploadError> {
229 let mut buf = [0u8; 1024];
230 for _ in 0..200 {
231 while let Some(out) = ft.poll_outbound() {
232 transport.write_all(&out)?;
233 }
234 let n = match transport.read(&mut buf) {
235 Ok(n) => n,
236 Err(e) if e.kind() == std::io::ErrorKind::TimedOut => 0,
237 Err(e) => return Err(UploadError::Io(e)),
238 };
239 if n > 0 {
240 ft.feed(&buf[..n], Instant::now());
241 }
242 while let Some(evt) = ft.poll() {
243 match evt {
244 FileEvent::Negotiated { compression, .. } => return Ok(compression),
245 FileEvent::Failed(err) => return Err(UploadError::Transfer(err)),
246 _ => {}
247 }
248 }
249 ft.tick(Instant::now());
250 }
251 Err(UploadError::Stalled("negotiation did not complete"))
252}
253
254fn drive_until_event<T: Read + Write + ?Sized, F: Fn(&FileEvent) -> bool>(
255 transport: &mut T,
256 ft: &mut FileTransfer<'_>,
257 pred: F,
258) -> Result<(), UploadError> {
259 let mut buf = [0u8; 1024];
260 for _ in 0..200 {
261 while let Some(out) = ft.poll_outbound() {
262 transport.write_all(&out)?;
263 }
264 let n = match transport.read(&mut buf) {
265 Ok(n) => n,
266 Err(e) if e.kind() == std::io::ErrorKind::TimedOut => 0,
267 Err(e) => return Err(UploadError::Io(e)),
268 };
269 if n > 0 {
270 ft.feed(&buf[..n], Instant::now());
271 }
272 while let Some(evt) = ft.poll() {
273 if let FileEvent::Failed(err) = &evt {
274 return Err(UploadError::Transfer(err.clone()));
275 }
276 if pred(&evt) {
277 return Ok(());
278 }
279 }
280 ft.tick(Instant::now());
281 }
282 Err(UploadError::Stalled("event did not arrive in time"))
283}