1use crate::{
9 errors::{CatBridgeError, NetworkError, NetworkParseError},
10 mion::{cgis::AUTHZ_HEADER, proto::cgis::MIONCGIErrors},
11};
12use bytes::{Bytes, BytesMut};
13use fnv::FnvHashMap;
14use futures::{future::Either, StreamExt};
15use reqwest::{Client, Response, Version};
16use serde::Serialize;
17use std::{
18 net::Ipv4Addr,
19 sync::atomic::{AtomicU8, Ordering as AtomicOrdering},
20 time::Duration,
21};
22use tokio::{
23 sync::mpsc::{
24 channel as bounded_channel, Receiver as BoundedReceiver, Sender as BoundedSender,
25 },
26 time::timeout,
27};
28use tracing::debug;
29
30const MEMORY_MAX_ADDRESS: usize = 0xFFFF_FE00;
31const TABLE_START_SIGIL: &str = "<table border=0 cellspacing=3 cellpadding=3>";
32const TABLE_END_SIGIL: &str = "</table>";
33const MAX_RETRIES: u8 = 10;
34const BACKOFF_SLEEP_SECONDS: u64 = 10;
35const MEMORY_TIMEOUT_SECONDS: u64 = 30;
36const MAX_MEMORY_CONCURRENCY: usize = 4;
37
38pub async fn dump_memory(
48 mion_ip: Ipv4Addr,
49 resume_at: Option<usize>,
50 early_stop_at: Option<usize>,
51) -> Result<Bytes, CatBridgeError> {
52 let mut memory_buffer = BytesMut::with_capacity(0xFFFF_FFFF);
53 dump_memory_with_raw_client(
54 &Client::default(),
55 mion_ip,
56 resume_at,
57 early_stop_at,
58 |bytes: Vec<u8>| {
59 memory_buffer.extend_from_slice(&bytes);
60 },
61 )
62 .await?;
63 Ok(memory_buffer.freeze())
64}
65
66pub async fn dump_memory_with_writer<FnTy>(
77 mion_ip: Ipv4Addr,
78 resume_at: Option<usize>,
79 early_stop_at: Option<usize>,
80 callback: FnTy,
81) -> Result<(), CatBridgeError>
82where
83 FnTy: FnMut(Vec<u8>) + Send + Sync,
84{
85 dump_memory_with_raw_client(
86 &Client::default(),
87 mion_ip,
88 resume_at,
89 early_stop_at,
90 callback,
91 )
92 .await
93}
94
95pub async fn dump_memory_with_raw_client<FnTy>(
105 client: &Client,
106 mion_ip: Ipv4Addr,
107 resume_at: Option<usize>,
108 early_stop_at: Option<usize>,
109 buff_callback: FnTy,
110) -> Result<(), CatBridgeError>
111where
112 FnTy: FnMut(Vec<u8>) + Send + Sync,
113{
114 let (stop_requests_sender, mut stop_requests_consumer) = bounded_channel(1);
115 let retry_counter = AtomicU8::new(0);
116
117 let start_address = resume_at.unwrap_or(0);
118 let (page_results_sender, page_results_consumer) = bounded_channel(512);
119
120 let retry_counter_ref = &retry_counter;
121 let sender_ref = &page_results_sender;
122 let buffered_stream_future = futures::stream::iter(
129 (start_address..=early_stop_at.unwrap_or(MEMORY_MAX_ADDRESS)).step_by(512),
130 )
131 .map(|page_start| async move {
132 loop {
133 if !do_memory_page_fetch(client, mion_ip, page_start, retry_counter_ref, sender_ref)
134 .await
135 {
136 break;
137 }
138 }
139 })
140 .buffered(MAX_MEMORY_CONCURRENCY)
141 .collect::<Vec<()>>();
142
143 let join_handle = do_memory_page_ordering(
147 page_results_consumer,
148 stop_requests_sender,
149 start_address,
150 buff_callback,
151 );
152
153 {
154 let recv_future = stop_requests_consumer.recv();
155
156 futures::pin_mut!(buffered_stream_future);
157 futures::pin_mut!(recv_future);
158
159 let (select, _joined) = futures::future::join(
160 futures::future::select(buffered_stream_future, recv_future),
162 join_handle,
164 )
165 .await;
166 match select {
167 Either::Right((error, _)) => {
168 if let Some(cause) = error {
169 return Err(cause);
170 }
171 }
172 Either::Left(_) => {}
173 }
174 }
175
176 if let Ok(cause) = stop_requests_consumer.try_recv() {
178 return Err(cause);
179 }
180
181 Ok(())
182}
183
184async fn do_memory_page_ordering<FnTy>(
185 mut results: BoundedReceiver<Result<(usize, Vec<u8>), CatBridgeError>>,
186 stopper: BoundedSender<CatBridgeError>,
187 start_at: usize,
188 mut callback: FnTy,
189) where
190 FnTy: FnMut(Vec<u8>),
191{
192 let mut out_of_order_cache: FnvHashMap<usize, Vec<u8>> = FnvHashMap::default();
193 let mut looking_for_page = start_at;
194
195 while looking_for_page <= MEMORY_MAX_ADDRESS {
196 if let Some(data) = out_of_order_cache.remove(&looking_for_page) {
197 callback(data);
198 looking_for_page += 512;
199 continue;
200 }
201
202 let Some(page_result) = results.recv().await else {
203 _ = stopper.send(CatBridgeError::ClosedChannel).await;
204 break;
205 };
206 match page_result {
207 Ok((addr, data)) => {
208 out_of_order_cache.insert(addr, data);
209 }
210 Err(cause) => {
211 _ = stopper.send(cause).await;
212 break;
213 }
214 }
215 }
216}
217
218async fn do_memory_page_fetch(
219 client: &Client,
220 mion_ip: Ipv4Addr,
221 page_start: usize,
222 retry_counter: &AtomicU8,
223 result_stream: &BoundedSender<Result<(usize, Vec<u8>), CatBridgeError>>,
224) -> bool {
225 let start_addr = format!("{page_start:08X}");
226 debug!(
227 bridge.ip = %mion_ip,
228 addr = %start_addr,
229 "Performing memory page fetch",
230 );
231
232 let timeout_response = timeout(
233 Duration::from_secs(MEMORY_TIMEOUT_SECONDS),
234 do_raw_memory_request(client, mion_ip, &[("start_addr", start_addr)]),
235 )
236 .await;
237
238 let Ok(potential_response) = timeout_response else {
239 if retry_counter.fetch_add(1, AtomicOrdering::AcqRel) > MAX_RETRIES {
240 _ = result_stream
241 .send(Err(NetworkError::Timeout(Duration::from_secs(
242 MEMORY_TIMEOUT_SECONDS,
243 ))
244 .into()))
245 .await;
246 return false;
247 }
248 debug!(bridge.ip = %mion_ip, "Slamming Memory dump too hard... backing off for a bit");
249 tokio::time::sleep(Duration::from_secs(BACKOFF_SLEEP_SECONDS)).await;
250 return true;
251 };
252 let response = match potential_response {
253 Ok(value) => value,
254 Err(cause) => {
255 if retry_counter.fetch_add(1, AtomicOrdering::AcqRel) > MAX_RETRIES {
256 _ = result_stream.send(Err(cause.into())).await;
257 return false;
258 }
259 debug!(bridge.ip = %mion_ip, "Slamming Memory dump too hard... backing off for a bit");
260 tokio::time::sleep(Duration::from_secs(BACKOFF_SLEEP_SECONDS)).await;
261 return true;
262 }
263 };
264
265 let status = response.status().as_u16();
266 let timeout_body_result = timeout(
267 Duration::from_secs(MEMORY_TIMEOUT_SECONDS),
268 response.bytes(),
269 )
270 .await;
271 let Ok(body_result) = timeout_body_result else {
272 if retry_counter.fetch_add(1, AtomicOrdering::AcqRel) > MAX_RETRIES {
273 _ = result_stream
274 .send(Err(NetworkError::Timeout(Duration::from_secs(
275 MEMORY_TIMEOUT_SECONDS,
276 ))
277 .into()))
278 .await;
279 return false;
280 }
281 debug!(bridge.ip = %mion_ip, "Slamming Memory dump too hard... backing off for a bit");
282 tokio::time::sleep(Duration::from_secs(BACKOFF_SLEEP_SECONDS)).await;
283 return true;
284 };
285
286 retry_counter.store(0, AtomicOrdering::Release);
287 if status != 200 {
288 if let Ok(body) = body_result {
289 _ = result_stream
290 .send(Err(MIONCGIErrors::UnexpectedStatusCode(status, body).into()))
291 .await;
292 return false;
293 }
294
295 _ = result_stream
296 .send(Err(MIONCGIErrors::UnexpectedStatusCodeNoBody(status).into()))
297 .await;
298 return false;
299 }
300 let read_body_bytes = match body_result.map_err(NetworkError::HTTP) {
301 Ok(value) => value,
302 Err(cause) => {
303 _ = result_stream.send(Err(cause.into())).await;
304 return false;
305 }
306 };
307 let body_as_string =
308 match String::from_utf8(read_body_bytes.into()).map_err(NetworkParseError::Utf8Expected) {
309 Ok(value) => value,
310 Err(cause) => {
311 _ = result_stream.send(Err(cause.into())).await;
312 return false;
313 }
314 };
315
316 process_received_page(page_start, result_stream, &body_as_string).await
317}
318
319async fn process_received_page(
320 page_start: usize,
321 result_stream: &BoundedSender<Result<(usize, Vec<u8>), CatBridgeError>>,
322 body_as_string: &str,
323) -> bool {
324 let table = match extract_memory_table_body(body_as_string) {
325 Ok(value) => value,
326 Err(cause) => {
327 _ = result_stream.send(Err(cause.into())).await;
328 return false;
329 }
330 };
331 let mut page_of_bytes = Vec::with_capacity(512);
332 for table_row in table.split("<tr>").skip(3) {
333 for table_column in table_row
334 .trim()
335 .trim_end_matches("</tbody>")
336 .trim_end()
337 .trim_end_matches("</tr>")
338 .trim_end()
339 .replace("</td>", "")
340 .split("<td>")
341 .skip(3)
342 {
343 if table_column.trim().len() != 2 {
344 _ = result_stream
345 .send(Err(MIONCGIErrors::HtmlResponseBadByte(
346 table_column.to_owned(),
347 )
348 .into()))
349 .await;
350 return false;
351 }
352 let byte = match u8::from_str_radix(table_column.trim(), 16)
353 .map_err(|_| MIONCGIErrors::HtmlResponseBadByte(table_column.to_owned()))
354 {
355 Ok(value) => value,
356 Err(cause) => {
357 _ = result_stream.send(Err(cause.into())).await;
358 return false;
359 }
360 };
361 page_of_bytes.push(byte);
362 }
363 }
364
365 _ = result_stream.send(Ok((page_start, page_of_bytes))).await;
366 false
367}
368
369fn extract_memory_table_body(body: &str) -> Result<String, MIONCGIErrors> {
370 let start = body
371 .find(TABLE_START_SIGIL)
372 .ok_or_else(|| MIONCGIErrors::HtmlResponseMissingMemoryDumpSigil(body.to_owned()))?;
373 let body_minus_start = &body[start + TABLE_START_SIGIL.len()..];
374 let end = body_minus_start
375 .find(TABLE_END_SIGIL)
376 .ok_or_else(|| MIONCGIErrors::HtmlResponseMissingMemoryDumpSigil(body.to_owned()))?;
377
378 Ok(body_minus_start[..end].to_owned())
379}
380
381pub async fn do_raw_memory_request<UrlEncodableType>(
392 client: &Client,
393 mion_ip: Ipv4Addr,
394 url_parameters: UrlEncodableType,
395) -> Result<Response, NetworkError>
396where
397 UrlEncodableType: Serialize,
398{
399 Ok(client
400 .post(format!("http://{mion_ip}/dbg/mem_dump.cgi"))
401 .version(Version::HTTP_11)
402 .header("authorization", format!("Basic {AUTHZ_HEADER}"))
403 .header("content-type", "application/x-www-form-urlencoded")
404 .header(
405 "user-agent",
406 format!("cat-dev/{}", env!("CARGO_PKG_VERSION")),
407 )
408 .body::<String>(
409 serde_urlencoded::to_string(&url_parameters)
410 .map_err(MIONCGIErrors::FormDataEncodeError)?,
411 )
412 .send()
413 .await?)
414}