marlin_binary_transfer/adapters/
tokio.rs1use std::time::{Duration, Instant};
28
29use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt};
30
31use crate::adapters::common::resolve_chunk_size;
32use crate::file_transfer::{Compression, FileEvent, FileTransfer};
33use crate::session::Session;
34
35pub use crate::adapters::common::{
36 Progress, ProgressCallback, UploadError, UploadOptions, UploadStats,
37};
38
39pub async fn upload<T, S>(
41 transport: &mut T,
42 src: &mut S,
43 mut options: UploadOptions,
44) -> Result<UploadStats, UploadError>
45where
46 T: AsyncRead + AsyncWrite + Unpin,
47 S: AsyncRead + Unpin,
48{
49 transport.write_all(b"M28B1\n").await?;
50
51 let mut session = Session::new();
52 session.connect(Instant::now());
53 drive_until_synced(transport, &mut session).await?;
54
55 let device_max = session.max_block_size().unwrap_or(0);
58
59 let mut ft = FileTransfer::new(&mut session);
60 ft.query(options.compression.clone(), Instant::now());
61 let negotiated = drive_until_negotiated(transport, &mut ft).await?;
62
63 ft.open(&options.dest_filename, options.dummy, Instant::now());
64 drive_until(transport, &mut ft, |e| matches!(e, FileEvent::Opened)).await?;
65
66 let mut stats = UploadStats {
67 compression: negotiated.clone(),
68 ..UploadStats::default()
69 };
70
71 let mut source_bytes = Vec::new();
72 src.read_to_end(&mut source_bytes).await?;
73 stats.source_bytes = source_bytes.len() as u64;
74
75 let payload: Vec<u8> = match &negotiated {
76 Compression::None => source_bytes,
77 Compression::Heatshrink { window, lookahead } => {
78 #[cfg(feature = "heatshrink")]
79 {
80 crate::compression::compress(&source_bytes, *window, *lookahead)?
81 }
82 #[cfg(not(feature = "heatshrink"))]
83 {
84 let _ = (window, lookahead);
85 return Err(UploadError::CompressionFeatureDisabled);
86 }
87 }
88 Compression::Auto => unreachable!("FileTransfer resolves Auto during query"),
89 };
90
91 let chunk_size = resolve_chunk_size(options.chunk_size, device_max);
92
93 for chunk in payload.chunks(chunk_size) {
94 ft.write(chunk, Instant::now());
95 drive_until(transport, &mut ft, |e| matches!(e, FileEvent::WriteAcked)).await?;
96 stats.bytes_sent += chunk.len() as u64;
97 stats.chunks_sent += 1;
98 if let Some(cb) = options.progress.as_mut() {
99 cb(Progress {
100 bytes_sent: stats.bytes_sent,
101 chunks_sent: stats.chunks_sent,
102 source_bytes: stats.source_bytes,
103 });
104 }
105 }
106
107 ft.close(Instant::now());
108 drive_until(transport, &mut ft, |e| matches!(e, FileEvent::Closed)).await?;
109
110 drop(ft);
114 session.send(0, 2, &[], Instant::now());
115 drive_session_until_idle(transport, &mut session).await?;
116
117 Ok(stats)
118}
119
120async fn drive_session_until_idle<T>(
121 transport: &mut T,
122 session: &mut Session,
123) -> Result<(), UploadError>
124where
125 T: AsyncRead + AsyncWrite + Unpin,
126{
127 use crate::file_transfer::FileError;
128 use crate::session::Event;
129 let mut buf = [0u8; 1024];
130 for _ in 0..200 {
131 while let Some(out) = session.poll_outbound() {
132 transport.write_all(&out).await?;
133 }
134 let n = read_with_timeout(transport, &mut buf, session.response_timeout()).await?;
135 if n > 0 {
136 session.feed(&buf[..n], Instant::now());
137 }
138 while let Some(evt) = session.poll_event() {
139 match evt {
140 Event::Ack(_) => return Ok(()),
141 Event::FatalError => {
142 return Err(UploadError::Transfer(FileError::SessionFatalError));
143 }
144 Event::Timeout { .. } => {
145 return Err(UploadError::Transfer(FileError::SessionTimeout));
146 }
147 Event::OutOfSync { expected, got } => {
148 return Err(UploadError::Transfer(FileError::SessionOutOfSync {
149 expected,
150 got,
151 }));
152 }
153 _ => {}
154 }
155 }
156 session.tick(Instant::now());
157 }
158 Err(UploadError::Stalled("control close not acked"))
159}
160
161async fn drive_until_synced<T>(transport: &mut T, session: &mut Session) -> Result<(), UploadError>
162where
163 T: AsyncRead + AsyncWrite + Unpin,
164{
165 use crate::file_transfer::FileError;
166 use crate::session::Event;
167 let mut buf = [0u8; 1024];
168 for _ in 0..200 {
169 while let Some(out) = session.poll_outbound() {
170 transport.write_all(&out).await?;
171 }
172 let n = read_with_timeout(transport, &mut buf, session.response_timeout()).await?;
173 if n > 0 {
174 session.feed(&buf[..n], Instant::now());
175 }
176 while let Some(evt) = session.poll_event() {
177 match evt {
178 Event::Synced { .. } => return Ok(()),
179 Event::FatalError => {
180 return Err(UploadError::Transfer(FileError::SessionFatalError));
181 }
182 Event::Timeout { .. } => {
183 return Err(UploadError::Transfer(FileError::SessionTimeout));
184 }
185 Event::OutOfSync { expected, got } => {
186 return Err(UploadError::Transfer(FileError::SessionOutOfSync {
187 expected,
188 got,
189 }));
190 }
191 _ => {}
192 }
193 }
194 session.tick(Instant::now());
195 }
196 Err(UploadError::HandshakeFailed)
197}
198
199async fn drive_until_negotiated<T>(
200 transport: &mut T,
201 ft: &mut FileTransfer<'_>,
202) -> Result<Compression, UploadError>
203where
204 T: AsyncRead + AsyncWrite + Unpin,
205{
206 let mut buf = [0u8; 1024];
207 for _ in 0..200 {
208 while let Some(out) = ft.poll_outbound() {
209 transport.write_all(&out).await?;
210 }
211 let n = read_with_timeout(transport, &mut buf, ft.response_timeout()).await?;
212 if n > 0 {
213 ft.feed(&buf[..n], Instant::now());
214 }
215 while let Some(evt) = ft.poll() {
216 match evt {
217 FileEvent::Negotiated { compression, .. } => return Ok(compression),
218 FileEvent::Failed(err) => return Err(UploadError::Transfer(err)),
219 _ => {}
220 }
221 }
222 ft.tick(Instant::now());
223 }
224 Err(UploadError::Stalled("negotiation did not complete"))
225}
226
227async fn drive_until<T, F>(
228 transport: &mut T,
229 ft: &mut FileTransfer<'_>,
230 pred: F,
231) -> Result<(), UploadError>
232where
233 T: AsyncRead + AsyncWrite + Unpin,
234 F: Fn(&FileEvent) -> bool,
235{
236 let mut buf = [0u8; 1024];
237 for _ in 0..200 {
238 while let Some(out) = ft.poll_outbound() {
239 transport.write_all(&out).await?;
240 }
241 let n = read_with_timeout(transport, &mut buf, ft.response_timeout()).await?;
242 if n > 0 {
243 ft.feed(&buf[..n], Instant::now());
244 }
245 while let Some(evt) = ft.poll() {
246 if let FileEvent::Failed(err) = &evt {
247 return Err(UploadError::Transfer(err.clone()));
248 }
249 if pred(&evt) {
250 return Ok(());
251 }
252 }
253 ft.tick(Instant::now());
254 }
255 Err(UploadError::Stalled("event did not arrive in time"))
256}
257
258async fn read_with_timeout<T>(
262 transport: &mut T,
263 buf: &mut [u8],
264 timeout: Duration,
265) -> Result<usize, UploadError>
266where
267 T: AsyncRead + Unpin,
268{
269 match tokio::time::timeout(timeout, transport.read(buf)).await {
270 Ok(r) => r.map_err(UploadError::Io),
271 Err(_) => Ok(0),
272 }
273}