cat_dev/mion/cgis/
dump_memory.rs

1//! API's for interacting with `/dbg/mem_dump.cgi`, a page for live
2//! accessing the memory of the memory on the main chip of the MION
3//! devices.
4//!
5//! Prefer this over `dbytes` over telnet, as there are some bytes that
6//! cannot be read over telnet, that can be read with this CGI interface.
7
8use crate::{
9	errors::{CatBridgeError, NetworkError, NetworkParseError},
10	mion::{
11		cgis::{AUTHZ_HEADER, encode_url_parameters},
12		proto::cgis::MionCGIErrors,
13	},
14};
15use bytes::{Bytes, BytesMut};
16use fnv::FnvHashMap;
17use futures::{StreamExt, future::Either};
18use reqwest::{Client, Response, Version};
19use std::{
20	fmt::Display,
21	net::Ipv4Addr,
22	ops::Deref,
23	sync::atomic::{AtomicU8, Ordering as AtomicOrdering},
24	time::Duration,
25};
26use tokio::{
27	sync::mpsc::{
28		Receiver as BoundedReceiver, Sender as BoundedSender, channel as bounded_channel,
29	},
30	time::timeout,
31};
32use tracing::debug;
33
34const MEMORY_MAX_ADDRESS: usize = 0xFFFF_FE00;
35const TABLE_START_SIGIL: &str = "<table border=0 cellspacing=3 cellpadding=3>";
36const TABLE_END_SIGIL: &str = "</table>";
37const MAX_RETRIES: u8 = 10;
38const BACKOFF_SLEEP_SECONDS: u64 = 10;
39const MEMORY_TIMEOUT_SECONDS: u64 = 30;
40const MAX_MEMORY_CONCURRENCY: usize = 4;
41
42/// Dump the existing memory for a MION.
43///
44/// ## Errors
45///
46/// - If we cannot encode the parameters as a form url encoded.
47/// - If we cannot make the HTTP request.
48/// - If the server does not respond with a 200.
49/// - If we cannot read the body from HTTP.
50/// - If we cannot parse the HTML response.
51pub async fn dump_memory(
52	mion_ip: Ipv4Addr,
53	resume_at: Option<usize>,
54	early_stop_at: Option<usize>,
55) -> Result<Bytes, CatBridgeError> {
56	let mut memory_buffer = BytesMut::with_capacity(0xFFFF_FFFF);
57	dump_memory_with_raw_client(
58		&Client::default(),
59		mion_ip,
60		resume_at,
61		early_stop_at,
62		|bytes: Vec<u8>| {
63			memory_buffer.extend_from_slice(&bytes);
64		},
65	)
66	.await?;
67	Ok(memory_buffer.freeze())
68}
69
70/// Dump the existing memory for a MION allowing you to write as
71/// dumps happen.
72///
73/// ## Errors
74///
75/// - If we cannot encode the parameters as a form url encoded.
76/// - If we cannot make the HTTP request.
77/// - If the server does not respond with a 200.
78/// - If we cannot read the body from HTTP.
79/// - If we cannot parse the HTML response.
80pub async fn dump_memory_with_writer<FnTy>(
81	mion_ip: Ipv4Addr,
82	resume_at: Option<usize>,
83	early_stop_at: Option<usize>,
84	callback: FnTy,
85) -> Result<(), CatBridgeError>
86where
87	FnTy: FnMut(Vec<u8>) + Send + Sync,
88{
89	dump_memory_with_raw_client(
90		&Client::default(),
91		mion_ip,
92		resume_at,
93		early_stop_at,
94		callback,
95	)
96	.await
97}
98
99/// Perform a memory dump request, but with an already existing HTTP client.
100///
101/// ## Errors
102///
103/// - If we cannot encode the parameters as a form url encoded.
104/// - If we cannot make the HTTP request.
105/// - If the server does not respond with a 200.
106/// - If we cannot read the body from HTTP.
107/// - If we cannot parse the HTML response.
108pub async fn dump_memory_with_raw_client<FnTy>(
109	client: &Client,
110	mion_ip: Ipv4Addr,
111	resume_at: Option<usize>,
112	early_stop_at: Option<usize>,
113	buff_callback: FnTy,
114) -> Result<(), CatBridgeError>
115where
116	FnTy: FnMut(Vec<u8>) + Send + Sync,
117{
118	let (stop_requests_sender, mut stop_requests_consumer) = bounded_channel(1);
119	let retry_counter = AtomicU8::new(0);
120
121	let start_address = resume_at.unwrap_or(0);
122	let (page_results_sender, page_results_consumer) = bounded_channel(512);
123
124	let retry_counter_ref = &retry_counter;
125	let sender_ref = &page_results_sender;
126	// This will make memory page requests concurrently in chunks of
127	// `MAX_MEMORY_CONCURRENCY`, with each task handling it's own retry
128	// logic.
129	//
130	// If requests start throwing errors, the receiving channel will send
131	// a message to `stop_requests_sender` which will shut everything down.
132	let buffered_stream_future = futures::stream::iter(
133		(start_address..=early_stop_at.unwrap_or(MEMORY_MAX_ADDRESS)).step_by(512),
134	)
135	.map(|page_start| async move {
136		loop {
137			if !do_memory_page_fetch(client, mion_ip, page_start, retry_counter_ref, sender_ref)
138				.await
139			{
140				break;
141			}
142		}
143	})
144	.buffered(MAX_MEMORY_CONCURRENCY)
145	.collect::<Vec<()>>();
146
147	// As requests finish, they may not necissarily be in order.
148	// We need to reorder them to ensure they're called back in a
149	// serial order.
150	let join_handle = do_memory_page_ordering(
151		page_results_consumer,
152		stop_requests_sender,
153		start_address,
154		buff_callback,
155	);
156
157	{
158		let recv_future = stop_requests_consumer.recv();
159
160		futures::pin_mut!(buffered_stream_future);
161		futures::pin_mut!(recv_future);
162
163		let (select, _joined) = futures::future::join(
164			// Wait for all requests to finish, or a stop signal caused by an error.
165			futures::future::select(buffered_stream_future, recv_future),
166			// Wait for all of our ordering to finish too.
167			join_handle,
168		)
169		.await;
170		match select {
171			Either::Right((error, _)) => {
172				if let Some(cause) = error {
173					return Err(cause);
174				}
175			}
176			Either::Left(_) => {}
177		}
178	}
179
180	// Double check we didn't get an error ordering all the pages.
181	if let Ok(cause) = stop_requests_consumer.try_recv() {
182		return Err(cause);
183	}
184
185	Ok(())
186}
187
188async fn do_memory_page_ordering<FnTy>(
189	mut results: BoundedReceiver<Result<(usize, Vec<u8>), CatBridgeError>>,
190	stopper: BoundedSender<CatBridgeError>,
191	start_at: usize,
192	mut callback: FnTy,
193) where
194	FnTy: FnMut(Vec<u8>),
195{
196	let mut out_of_order_cache: FnvHashMap<usize, Vec<u8>> = FnvHashMap::default();
197	let mut looking_for_page = start_at;
198
199	while looking_for_page <= MEMORY_MAX_ADDRESS {
200		if let Some(data) = out_of_order_cache.remove(&looking_for_page) {
201			callback(data);
202			looking_for_page += 512;
203			continue;
204		}
205
206		let Some(page_result) = results.recv().await else {
207			_ = stopper.send(CatBridgeError::ClosedChannel).await;
208			break;
209		};
210		match page_result {
211			Ok((addr, data)) => {
212				out_of_order_cache.insert(addr, data);
213			}
214			Err(cause) => {
215				_ = stopper.send(cause).await;
216				break;
217			}
218		}
219	}
220}
221
222async fn do_memory_page_fetch(
223	client: &Client,
224	mion_ip: Ipv4Addr,
225	page_start: usize,
226	retry_counter: &AtomicU8,
227	result_stream: &BoundedSender<Result<(usize, Vec<u8>), CatBridgeError>>,
228) -> bool {
229	let start_addr = format!("{page_start:08X}");
230	debug!(
231		bridge.ip = %mion_ip,
232		addr = %start_addr,
233		"Performing memory page fetch",
234	);
235
236	let timeout_response = timeout(
237		Duration::from_secs(MEMORY_TIMEOUT_SECONDS),
238		do_raw_memory_request(client, mion_ip, &[("start_addr", start_addr)]),
239	)
240	.await;
241
242	let Ok(potential_response) = timeout_response else {
243		if retry_counter.fetch_add(1, AtomicOrdering::AcqRel) > MAX_RETRIES {
244			_ = result_stream
245				.send(Err(NetworkError::Timeout(Duration::from_secs(
246					MEMORY_TIMEOUT_SECONDS,
247				))
248				.into()))
249				.await;
250			return false;
251		}
252		debug!(bridge.ip = %mion_ip, "Slamming Memory dump too hard... backing off for a bit");
253		tokio::time::sleep(Duration::from_secs(BACKOFF_SLEEP_SECONDS)).await;
254		return true;
255	};
256	let response = match potential_response {
257		Ok(value) => value,
258		Err(cause) => {
259			if retry_counter.fetch_add(1, AtomicOrdering::AcqRel) > MAX_RETRIES {
260				_ = result_stream.send(Err(cause.into())).await;
261				return false;
262			}
263			debug!(bridge.ip = %mion_ip, "Slamming Memory dump too hard... backing off for a bit");
264			tokio::time::sleep(Duration::from_secs(BACKOFF_SLEEP_SECONDS)).await;
265			return true;
266		}
267	};
268
269	let status = response.status().as_u16();
270	let timeout_body_result = timeout(
271		Duration::from_secs(MEMORY_TIMEOUT_SECONDS),
272		response.bytes(),
273	)
274	.await;
275	let Ok(body_result) = timeout_body_result else {
276		if retry_counter.fetch_add(1, AtomicOrdering::AcqRel) > MAX_RETRIES {
277			_ = result_stream
278				.send(Err(NetworkError::Timeout(Duration::from_secs(
279					MEMORY_TIMEOUT_SECONDS,
280				))
281				.into()))
282				.await;
283			return false;
284		}
285		debug!(bridge.ip = %mion_ip, "Slamming Memory dump too hard... backing off for a bit");
286		tokio::time::sleep(Duration::from_secs(BACKOFF_SLEEP_SECONDS)).await;
287		return true;
288	};
289
290	retry_counter.store(0, AtomicOrdering::Release);
291	if status != 200 {
292		if let Ok(body) = body_result {
293			_ = result_stream
294				.send(Err(MionCGIErrors::UnexpectedStatusCode(status, body).into()))
295				.await;
296			return false;
297		}
298
299		_ = result_stream
300			.send(Err(MionCGIErrors::UnexpectedStatusCodeNoBody(status).into()))
301			.await;
302		return false;
303	}
304	let read_body_bytes = match body_result.map_err(NetworkError::HTTP) {
305		Ok(value) => value,
306		Err(cause) => {
307			_ = result_stream.send(Err(cause.into())).await;
308			return false;
309		}
310	};
311	let body_as_string =
312		match String::from_utf8(read_body_bytes.into()).map_err(NetworkParseError::Utf8Expected) {
313			Ok(value) => value,
314			Err(cause) => {
315				_ = result_stream.send(Err(cause.into())).await;
316				return false;
317			}
318		};
319
320	process_received_page(page_start, result_stream, &body_as_string).await
321}
322
323async fn process_received_page(
324	page_start: usize,
325	result_stream: &BoundedSender<Result<(usize, Vec<u8>), CatBridgeError>>,
326	body_as_string: &str,
327) -> bool {
328	let table = match extract_memory_table_body(body_as_string) {
329		Ok(value) => value,
330		Err(cause) => {
331			_ = result_stream.send(Err(cause.into())).await;
332			return false;
333		}
334	};
335	let mut page_of_bytes = Vec::with_capacity(512);
336	for table_row in table.split("<tr>").skip(3) {
337		for table_column in table_row
338			.trim()
339			.trim_end_matches("</tbody>")
340			.trim_end()
341			.trim_end_matches("</tr>")
342			.trim_end()
343			.replace("</td>", "")
344			.split("<td>")
345			.skip(3)
346		{
347			if table_column.trim().len() != 2 {
348				_ = result_stream
349					.send(Err(MionCGIErrors::HtmlResponseBadByte(
350						table_column.to_owned(),
351					)
352					.into()))
353					.await;
354				return false;
355			}
356			let byte = match u8::from_str_radix(table_column.trim(), 16)
357				.map_err(|_| MionCGIErrors::HtmlResponseBadByte(table_column.to_owned()))
358			{
359				Ok(value) => value,
360				Err(cause) => {
361					_ = result_stream.send(Err(cause.into())).await;
362					return false;
363				}
364			};
365			page_of_bytes.push(byte);
366		}
367	}
368
369	_ = result_stream.send(Ok((page_start, page_of_bytes))).await;
370	false
371}
372
373fn extract_memory_table_body(body: &str) -> Result<String, MionCGIErrors> {
374	let start = body
375		.find(TABLE_START_SIGIL)
376		.ok_or_else(|| MionCGIErrors::HtmlResponseMissingMemoryDumpSigil(body.to_owned()))?;
377	let body_minus_start = &body[start + TABLE_START_SIGIL.len()..];
378	let end = body_minus_start
379		.find(TABLE_END_SIGIL)
380		.ok_or_else(|| MionCGIErrors::HtmlResponseMissingMemoryDumpSigil(body.to_owned()))?;
381
382	Ok(body_minus_start[..end].to_owned())
383}
384
385/// Perform a raw request on the MION board's `eeprom_dump.cgi` page.
386///
387/// *note: you probably want to call one of the actual methods, as this is
388/// basically just a thin wrapper around an HTTP Post Request. Not doing much
389/// else more. A lot of it requires that you set things up correctly.*
390///
391/// ## Errors
392///
393/// - If we cannot make an HTTP request to the MION Request.
394/// - If we fail to encode your parameters into a request body.
395pub async fn do_raw_memory_request(
396	client: &Client,
397	mion_ip: Ipv4Addr,
398	url_parameters: &[(impl Deref<Target = str>, impl Display)],
399) -> Result<Response, NetworkError> {
400	Ok(client
401		.post(format!("http://{mion_ip}/dbg/mem_dump.cgi"))
402		.version(Version::HTTP_11)
403		.header("authorization", format!("Basic {AUTHZ_HEADER}"))
404		.header("content-type", "application/x-www-form-urlencoded")
405		.header(
406			"user-agent",
407			format!("cat-dev/{}", env!("CARGO_PKG_VERSION")),
408		)
409		.body::<String>(encode_url_parameters(url_parameters))
410		.send()
411		.await?)
412}