1use crate::{
9 errors::{CatBridgeError, NetworkError, NetworkParseError},
10 mion::{
11 cgis::{AUTHZ_HEADER, encode_url_parameters},
12 proto::cgis::MionCGIErrors,
13 },
14};
15use bytes::{Bytes, BytesMut};
16use fnv::FnvHashMap;
17use futures::{StreamExt, future::Either};
18use reqwest::{Client, Response, Version};
19use std::{
20 fmt::Display,
21 net::Ipv4Addr,
22 ops::Deref,
23 sync::atomic::{AtomicU8, Ordering as AtomicOrdering},
24 time::Duration,
25};
26use tokio::{
27 sync::mpsc::{
28 Receiver as BoundedReceiver, Sender as BoundedSender, channel as bounded_channel,
29 },
30 time::timeout,
31};
32use tracing::debug;
33
34const MEMORY_MAX_ADDRESS: usize = 0xFFFF_FE00;
35const TABLE_START_SIGIL: &str = "<table border=0 cellspacing=3 cellpadding=3>";
36const TABLE_END_SIGIL: &str = "</table>";
37const MAX_RETRIES: u8 = 10;
38const BACKOFF_SLEEP_SECONDS: u64 = 10;
39const MEMORY_TIMEOUT_SECONDS: u64 = 30;
40const MAX_MEMORY_CONCURRENCY: usize = 4;
41
42pub async fn dump_memory(
52 mion_ip: Ipv4Addr,
53 resume_at: Option<usize>,
54 early_stop_at: Option<usize>,
55) -> Result<Bytes, CatBridgeError> {
56 let mut memory_buffer = BytesMut::with_capacity(0xFFFF_FFFF);
57 dump_memory_with_raw_client(
58 &Client::default(),
59 mion_ip,
60 resume_at,
61 early_stop_at,
62 |bytes: Vec<u8>| {
63 memory_buffer.extend_from_slice(&bytes);
64 },
65 )
66 .await?;
67 Ok(memory_buffer.freeze())
68}
69
70pub async fn dump_memory_with_writer<FnTy>(
81 mion_ip: Ipv4Addr,
82 resume_at: Option<usize>,
83 early_stop_at: Option<usize>,
84 callback: FnTy,
85) -> Result<(), CatBridgeError>
86where
87 FnTy: FnMut(Vec<u8>) + Send + Sync,
88{
89 dump_memory_with_raw_client(
90 &Client::default(),
91 mion_ip,
92 resume_at,
93 early_stop_at,
94 callback,
95 )
96 .await
97}
98
99pub async fn dump_memory_with_raw_client<FnTy>(
109 client: &Client,
110 mion_ip: Ipv4Addr,
111 resume_at: Option<usize>,
112 early_stop_at: Option<usize>,
113 buff_callback: FnTy,
114) -> Result<(), CatBridgeError>
115where
116 FnTy: FnMut(Vec<u8>) + Send + Sync,
117{
118 let (stop_requests_sender, mut stop_requests_consumer) = bounded_channel(1);
119 let retry_counter = AtomicU8::new(0);
120
121 let start_address = resume_at.unwrap_or(0);
122 let (page_results_sender, page_results_consumer) = bounded_channel(512);
123
124 let retry_counter_ref = &retry_counter;
125 let sender_ref = &page_results_sender;
126 let buffered_stream_future = futures::stream::iter(
133 (start_address..=early_stop_at.unwrap_or(MEMORY_MAX_ADDRESS)).step_by(512),
134 )
135 .map(|page_start| async move {
136 loop {
137 if !do_memory_page_fetch(client, mion_ip, page_start, retry_counter_ref, sender_ref)
138 .await
139 {
140 break;
141 }
142 }
143 })
144 .buffered(MAX_MEMORY_CONCURRENCY)
145 .collect::<Vec<()>>();
146
147 let join_handle = do_memory_page_ordering(
151 page_results_consumer,
152 stop_requests_sender,
153 start_address,
154 buff_callback,
155 );
156
157 {
158 let recv_future = stop_requests_consumer.recv();
159
160 futures::pin_mut!(buffered_stream_future);
161 futures::pin_mut!(recv_future);
162
163 let (select, _joined) = futures::future::join(
164 futures::future::select(buffered_stream_future, recv_future),
166 join_handle,
168 )
169 .await;
170 match select {
171 Either::Right((error, _)) => {
172 if let Some(cause) = error {
173 return Err(cause);
174 }
175 }
176 Either::Left(_) => {}
177 }
178 }
179
180 if let Ok(cause) = stop_requests_consumer.try_recv() {
182 return Err(cause);
183 }
184
185 Ok(())
186}
187
188async fn do_memory_page_ordering<FnTy>(
189 mut results: BoundedReceiver<Result<(usize, Vec<u8>), CatBridgeError>>,
190 stopper: BoundedSender<CatBridgeError>,
191 start_at: usize,
192 mut callback: FnTy,
193) where
194 FnTy: FnMut(Vec<u8>),
195{
196 let mut out_of_order_cache: FnvHashMap<usize, Vec<u8>> = FnvHashMap::default();
197 let mut looking_for_page = start_at;
198
199 while looking_for_page <= MEMORY_MAX_ADDRESS {
200 if let Some(data) = out_of_order_cache.remove(&looking_for_page) {
201 callback(data);
202 looking_for_page += 512;
203 continue;
204 }
205
206 let Some(page_result) = results.recv().await else {
207 _ = stopper.send(CatBridgeError::ClosedChannel).await;
208 break;
209 };
210 match page_result {
211 Ok((addr, data)) => {
212 out_of_order_cache.insert(addr, data);
213 }
214 Err(cause) => {
215 _ = stopper.send(cause).await;
216 break;
217 }
218 }
219 }
220}
221
222async fn do_memory_page_fetch(
223 client: &Client,
224 mion_ip: Ipv4Addr,
225 page_start: usize,
226 retry_counter: &AtomicU8,
227 result_stream: &BoundedSender<Result<(usize, Vec<u8>), CatBridgeError>>,
228) -> bool {
229 let start_addr = format!("{page_start:08X}");
230 debug!(
231 bridge.ip = %mion_ip,
232 addr = %start_addr,
233 "Performing memory page fetch",
234 );
235
236 let timeout_response = timeout(
237 Duration::from_secs(MEMORY_TIMEOUT_SECONDS),
238 do_raw_memory_request(client, mion_ip, &[("start_addr", start_addr)]),
239 )
240 .await;
241
242 let Ok(potential_response) = timeout_response else {
243 if retry_counter.fetch_add(1, AtomicOrdering::AcqRel) > MAX_RETRIES {
244 _ = result_stream
245 .send(Err(NetworkError::Timeout(Duration::from_secs(
246 MEMORY_TIMEOUT_SECONDS,
247 ))
248 .into()))
249 .await;
250 return false;
251 }
252 debug!(bridge.ip = %mion_ip, "Slamming Memory dump too hard... backing off for a bit");
253 tokio::time::sleep(Duration::from_secs(BACKOFF_SLEEP_SECONDS)).await;
254 return true;
255 };
256 let response = match potential_response {
257 Ok(value) => value,
258 Err(cause) => {
259 if retry_counter.fetch_add(1, AtomicOrdering::AcqRel) > MAX_RETRIES {
260 _ = result_stream.send(Err(cause.into())).await;
261 return false;
262 }
263 debug!(bridge.ip = %mion_ip, "Slamming Memory dump too hard... backing off for a bit");
264 tokio::time::sleep(Duration::from_secs(BACKOFF_SLEEP_SECONDS)).await;
265 return true;
266 }
267 };
268
269 let status = response.status().as_u16();
270 let timeout_body_result = timeout(
271 Duration::from_secs(MEMORY_TIMEOUT_SECONDS),
272 response.bytes(),
273 )
274 .await;
275 let Ok(body_result) = timeout_body_result else {
276 if retry_counter.fetch_add(1, AtomicOrdering::AcqRel) > MAX_RETRIES {
277 _ = result_stream
278 .send(Err(NetworkError::Timeout(Duration::from_secs(
279 MEMORY_TIMEOUT_SECONDS,
280 ))
281 .into()))
282 .await;
283 return false;
284 }
285 debug!(bridge.ip = %mion_ip, "Slamming Memory dump too hard... backing off for a bit");
286 tokio::time::sleep(Duration::from_secs(BACKOFF_SLEEP_SECONDS)).await;
287 return true;
288 };
289
290 retry_counter.store(0, AtomicOrdering::Release);
291 if status != 200 {
292 if let Ok(body) = body_result {
293 _ = result_stream
294 .send(Err(MionCGIErrors::UnexpectedStatusCode(status, body).into()))
295 .await;
296 return false;
297 }
298
299 _ = result_stream
300 .send(Err(MionCGIErrors::UnexpectedStatusCodeNoBody(status).into()))
301 .await;
302 return false;
303 }
304 let read_body_bytes = match body_result.map_err(NetworkError::HTTP) {
305 Ok(value) => value,
306 Err(cause) => {
307 _ = result_stream.send(Err(cause.into())).await;
308 return false;
309 }
310 };
311 let body_as_string =
312 match String::from_utf8(read_body_bytes.into()).map_err(NetworkParseError::Utf8Expected) {
313 Ok(value) => value,
314 Err(cause) => {
315 _ = result_stream.send(Err(cause.into())).await;
316 return false;
317 }
318 };
319
320 process_received_page(page_start, result_stream, &body_as_string).await
321}
322
323async fn process_received_page(
324 page_start: usize,
325 result_stream: &BoundedSender<Result<(usize, Vec<u8>), CatBridgeError>>,
326 body_as_string: &str,
327) -> bool {
328 let table = match extract_memory_table_body(body_as_string) {
329 Ok(value) => value,
330 Err(cause) => {
331 _ = result_stream.send(Err(cause.into())).await;
332 return false;
333 }
334 };
335 let mut page_of_bytes = Vec::with_capacity(512);
336 for table_row in table.split("<tr>").skip(3) {
337 for table_column in table_row
338 .trim()
339 .trim_end_matches("</tbody>")
340 .trim_end()
341 .trim_end_matches("</tr>")
342 .trim_end()
343 .replace("</td>", "")
344 .split("<td>")
345 .skip(3)
346 {
347 if table_column.trim().len() != 2 {
348 _ = result_stream
349 .send(Err(MionCGIErrors::HtmlResponseBadByte(
350 table_column.to_owned(),
351 )
352 .into()))
353 .await;
354 return false;
355 }
356 let byte = match u8::from_str_radix(table_column.trim(), 16)
357 .map_err(|_| MionCGIErrors::HtmlResponseBadByte(table_column.to_owned()))
358 {
359 Ok(value) => value,
360 Err(cause) => {
361 _ = result_stream.send(Err(cause.into())).await;
362 return false;
363 }
364 };
365 page_of_bytes.push(byte);
366 }
367 }
368
369 _ = result_stream.send(Ok((page_start, page_of_bytes))).await;
370 false
371}
372
373fn extract_memory_table_body(body: &str) -> Result<String, MionCGIErrors> {
374 let start = body
375 .find(TABLE_START_SIGIL)
376 .ok_or_else(|| MionCGIErrors::HtmlResponseMissingMemoryDumpSigil(body.to_owned()))?;
377 let body_minus_start = &body[start + TABLE_START_SIGIL.len()..];
378 let end = body_minus_start
379 .find(TABLE_END_SIGIL)
380 .ok_or_else(|| MionCGIErrors::HtmlResponseMissingMemoryDumpSigil(body.to_owned()))?;
381
382 Ok(body_minus_start[..end].to_owned())
383}
384
385pub async fn do_raw_memory_request(
396 client: &Client,
397 mion_ip: Ipv4Addr,
398 url_parameters: &[(impl Deref<Target = str>, impl Display)],
399) -> Result<Response, NetworkError> {
400 Ok(client
401 .post(format!("http://{mion_ip}/dbg/mem_dump.cgi"))
402 .version(Version::HTTP_11)
403 .header("authorization", format!("Basic {AUTHZ_HEADER}"))
404 .header("content-type", "application/x-www-form-urlencoded")
405 .header(
406 "user-agent",
407 format!("cat-dev/{}", env!("CARGO_PKG_VERSION")),
408 )
409 .body::<String>(encode_url_parameters(url_parameters))
410 .send()
411 .await?)
412}