marlin_binary_transfer/adapters/
tokio.rs1use std::time::{Duration, Instant};
28
29use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt};
30
31use crate::adapters::common::resolve_chunk_size;
32use crate::file_transfer::{Compression, FileEvent, FileTransfer};
33use crate::session::Session;
34
35pub use crate::adapters::common::{UploadError, UploadOptions, UploadStats};
36
37pub async fn upload<T, S>(
39 transport: &mut T,
40 src: &mut S,
41 options: UploadOptions,
42) -> Result<UploadStats, UploadError>
43where
44 T: AsyncRead + AsyncWrite + Unpin,
45 S: AsyncRead + Unpin,
46{
47 transport.write_all(b"M28B1\n").await?;
48
49 let mut session = Session::new();
50 session.connect(Instant::now());
51 drive_until_synced(transport, &mut session).await?;
52
53 let device_max = session.max_block_size().unwrap_or(0);
56
57 let mut ft = FileTransfer::new(&mut session);
58 ft.query(options.compression.clone(), Instant::now());
59 let negotiated = drive_until_negotiated(transport, &mut ft).await?;
60
61 ft.open(&options.dest_filename, options.dummy, Instant::now());
62 drive_until(transport, &mut ft, |e| matches!(e, FileEvent::Opened)).await?;
63
64 let mut stats = UploadStats {
65 compression: negotiated.clone(),
66 ..UploadStats::default()
67 };
68
69 let mut source_bytes = Vec::new();
70 src.read_to_end(&mut source_bytes).await?;
71 stats.source_bytes = source_bytes.len() as u64;
72
73 let payload: Vec<u8> = match &negotiated {
74 Compression::None => source_bytes,
75 Compression::Heatshrink { window, lookahead } => {
76 #[cfg(feature = "heatshrink")]
77 {
78 crate::compression::compress(&source_bytes, *window, *lookahead)?
79 }
80 #[cfg(not(feature = "heatshrink"))]
81 {
82 let _ = (window, lookahead);
83 return Err(UploadError::CompressionFeatureDisabled);
84 }
85 }
86 Compression::Auto => unreachable!("FileTransfer resolves Auto during query"),
87 };
88
89 let chunk_size = resolve_chunk_size(options.chunk_size, device_max);
90
91 for chunk in payload.chunks(chunk_size) {
92 ft.write(chunk, Instant::now());
93 drive_until(transport, &mut ft, |e| matches!(e, FileEvent::WriteAcked)).await?;
94 stats.bytes_sent += chunk.len() as u64;
95 stats.chunks_sent += 1;
96 }
97
98 ft.close(Instant::now());
99 drive_until(transport, &mut ft, |e| matches!(e, FileEvent::Closed)).await?;
100
101 drop(ft);
105 session.send(0, 2, &[], Instant::now());
106 drive_session_until_idle(transport, &mut session).await?;
107
108 Ok(stats)
109}
110
111async fn drive_session_until_idle<T>(
112 transport: &mut T,
113 session: &mut Session,
114) -> Result<(), UploadError>
115where
116 T: AsyncRead + AsyncWrite + Unpin,
117{
118 use crate::file_transfer::FileError;
119 use crate::session::Event;
120 let mut buf = [0u8; 1024];
121 for _ in 0..200 {
122 while let Some(out) = session.poll_outbound() {
123 transport.write_all(&out).await?;
124 }
125 let n = read_with_timeout(transport, &mut buf, session.response_timeout()).await?;
126 if n > 0 {
127 session.feed(&buf[..n], Instant::now());
128 }
129 while let Some(evt) = session.poll_event() {
130 match evt {
131 Event::Ack(_) => return Ok(()),
132 Event::FatalError => {
133 return Err(UploadError::Transfer(FileError::SessionFatalError));
134 }
135 Event::Timeout { .. } => {
136 return Err(UploadError::Transfer(FileError::SessionTimeout));
137 }
138 Event::OutOfSync { expected, got } => {
139 return Err(UploadError::Transfer(FileError::SessionOutOfSync {
140 expected,
141 got,
142 }));
143 }
144 _ => {}
145 }
146 }
147 session.tick(Instant::now());
148 }
149 Err(UploadError::Stalled("control close not acked"))
150}
151
152async fn drive_until_synced<T>(transport: &mut T, session: &mut Session) -> Result<(), UploadError>
153where
154 T: AsyncRead + AsyncWrite + Unpin,
155{
156 use crate::file_transfer::FileError;
157 use crate::session::Event;
158 let mut buf = [0u8; 1024];
159 for _ in 0..200 {
160 while let Some(out) = session.poll_outbound() {
161 transport.write_all(&out).await?;
162 }
163 let n = read_with_timeout(transport, &mut buf, session.response_timeout()).await?;
164 if n > 0 {
165 session.feed(&buf[..n], Instant::now());
166 }
167 while let Some(evt) = session.poll_event() {
168 match evt {
169 Event::Synced { .. } => return Ok(()),
170 Event::FatalError => {
171 return Err(UploadError::Transfer(FileError::SessionFatalError));
172 }
173 Event::Timeout { .. } => {
174 return Err(UploadError::Transfer(FileError::SessionTimeout));
175 }
176 Event::OutOfSync { expected, got } => {
177 return Err(UploadError::Transfer(FileError::SessionOutOfSync {
178 expected,
179 got,
180 }));
181 }
182 _ => {}
183 }
184 }
185 session.tick(Instant::now());
186 }
187 Err(UploadError::HandshakeFailed)
188}
189
190async fn drive_until_negotiated<T>(
191 transport: &mut T,
192 ft: &mut FileTransfer<'_>,
193) -> Result<Compression, UploadError>
194where
195 T: AsyncRead + AsyncWrite + Unpin,
196{
197 let mut buf = [0u8; 1024];
198 for _ in 0..200 {
199 while let Some(out) = ft.poll_outbound() {
200 transport.write_all(&out).await?;
201 }
202 let n = read_with_timeout(transport, &mut buf, ft.response_timeout()).await?;
203 if n > 0 {
204 ft.feed(&buf[..n], Instant::now());
205 }
206 while let Some(evt) = ft.poll() {
207 match evt {
208 FileEvent::Negotiated { compression, .. } => return Ok(compression),
209 FileEvent::Failed(err) => return Err(UploadError::Transfer(err)),
210 _ => {}
211 }
212 }
213 ft.tick(Instant::now());
214 }
215 Err(UploadError::Stalled("negotiation did not complete"))
216}
217
218async fn drive_until<T, F>(
219 transport: &mut T,
220 ft: &mut FileTransfer<'_>,
221 pred: F,
222) -> Result<(), UploadError>
223where
224 T: AsyncRead + AsyncWrite + Unpin,
225 F: Fn(&FileEvent) -> bool,
226{
227 let mut buf = [0u8; 1024];
228 for _ in 0..200 {
229 while let Some(out) = ft.poll_outbound() {
230 transport.write_all(&out).await?;
231 }
232 let n = read_with_timeout(transport, &mut buf, ft.response_timeout()).await?;
233 if n > 0 {
234 ft.feed(&buf[..n], Instant::now());
235 }
236 while let Some(evt) = ft.poll() {
237 if let FileEvent::Failed(err) = &evt {
238 return Err(UploadError::Transfer(err.clone()));
239 }
240 if pred(&evt) {
241 return Ok(());
242 }
243 }
244 ft.tick(Instant::now());
245 }
246 Err(UploadError::Stalled("event did not arrive in time"))
247}
248
249async fn read_with_timeout<T>(
253 transport: &mut T,
254 buf: &mut [u8],
255 timeout: Duration,
256) -> Result<usize, UploadError>
257where
258 T: AsyncRead + Unpin,
259{
260 match tokio::time::timeout(timeout, transport.read(buf)).await {
261 Ok(r) => r.map_err(UploadError::Io),
262 Err(_) => Ok(0),
263 }
264}