cat_dev/mion/cgis/
dump_memory.rs

1//! API's for interacting with `/dbg/mem_dump.cgi`, a page for live
2//! accessing the memory of the memory on the main chip of the MION
3//! devices.
4//!
5//! Prefer this over `dbytes` over telnet, as there are some bytes that
6//! cannot be read over telnet, that can be read with this CGI interface.
7
8use crate::{
9	errors::{CatBridgeError, NetworkError, NetworkParseError},
10	mion::{cgis::AUTHZ_HEADER, proto::cgis::MIONCGIErrors},
11};
12use bytes::{Bytes, BytesMut};
13use fnv::FnvHashMap;
14use futures::{future::Either, StreamExt};
15use reqwest::{Client, Response, Version};
16use serde::Serialize;
17use std::{
18	net::Ipv4Addr,
19	sync::atomic::{AtomicU8, Ordering as AtomicOrdering},
20	time::Duration,
21};
22use tokio::{
23	sync::mpsc::{
24		channel as bounded_channel, Receiver as BoundedReceiver, Sender as BoundedSender,
25	},
26	time::timeout,
27};
28use tracing::debug;
29
30const MEMORY_MAX_ADDRESS: usize = 0xFFFF_FE00;
31const TABLE_START_SIGIL: &str = "<table border=0 cellspacing=3 cellpadding=3>";
32const TABLE_END_SIGIL: &str = "</table>";
33const MAX_RETRIES: u8 = 10;
34const BACKOFF_SLEEP_SECONDS: u64 = 10;
35const MEMORY_TIMEOUT_SECONDS: u64 = 30;
36const MAX_MEMORY_CONCURRENCY: usize = 4;
37
38/// Dump the existing memory for a MION.
39///
40/// ## Errors
41///
42/// - If we cannot encode the parameters as a form url encoded.
43/// - If we cannot make the HTTP request.
44/// - If the server does not respond with a 200.
45/// - If we cannot read the body from HTTP.
46/// - If we cannot parse the HTML response.
47pub async fn dump_memory(
48	mion_ip: Ipv4Addr,
49	resume_at: Option<usize>,
50	early_stop_at: Option<usize>,
51) -> Result<Bytes, CatBridgeError> {
52	let mut memory_buffer = BytesMut::with_capacity(0xFFFF_FFFF);
53	dump_memory_with_raw_client(
54		&Client::default(),
55		mion_ip,
56		resume_at,
57		early_stop_at,
58		|bytes: Vec<u8>| {
59			memory_buffer.extend_from_slice(&bytes);
60		},
61	)
62	.await?;
63	Ok(memory_buffer.freeze())
64}
65
66/// Dump the existing memory for a MION allowing you to write as
67/// dumps happen.
68///
69/// ## Errors
70///
71/// - If we cannot encode the parameters as a form url encoded.
72/// - If we cannot make the HTTP request.
73/// - If the server does not respond with a 200.
74/// - If we cannot read the body from HTTP.
75/// - If we cannot parse the HTML response.
76pub async fn dump_memory_with_writer<FnTy>(
77	mion_ip: Ipv4Addr,
78	resume_at: Option<usize>,
79	early_stop_at: Option<usize>,
80	callback: FnTy,
81) -> Result<(), CatBridgeError>
82where
83	FnTy: FnMut(Vec<u8>) + Send + Sync,
84{
85	dump_memory_with_raw_client(
86		&Client::default(),
87		mion_ip,
88		resume_at,
89		early_stop_at,
90		callback,
91	)
92	.await
93}
94
95/// Perform a memory dump request, but with an already existing HTTP client.
96///
97/// ## Errors
98///
99/// - If we cannot encode the parameters as a form url encoded.
100/// - If we cannot make the HTTP request.
101/// - If the server does not respond with a 200.
102/// - If we cannot read the body from HTTP.
103/// - If we cannot parse the HTML response.
104pub async fn dump_memory_with_raw_client<FnTy>(
105	client: &Client,
106	mion_ip: Ipv4Addr,
107	resume_at: Option<usize>,
108	early_stop_at: Option<usize>,
109	buff_callback: FnTy,
110) -> Result<(), CatBridgeError>
111where
112	FnTy: FnMut(Vec<u8>) + Send + Sync,
113{
114	let (stop_requests_sender, mut stop_requests_consumer) = bounded_channel(1);
115	let retry_counter = AtomicU8::new(0);
116
117	let start_address = resume_at.unwrap_or(0);
118	let (page_results_sender, page_results_consumer) = bounded_channel(512);
119
120	let retry_counter_ref = &retry_counter;
121	let sender_ref = &page_results_sender;
122	// This will make memory page requests concurrently in chunks of
123	// `MAX_MEMORY_CONCURRENCY`, with each task handling it's own retry
124	// logic.
125	//
126	// If requests start throwing errors, the receiving channel will send
127	// a message to `stop_requests_sender` which will shut everything down.
128	let buffered_stream_future = futures::stream::iter(
129		(start_address..=early_stop_at.unwrap_or(MEMORY_MAX_ADDRESS)).step_by(512),
130	)
131	.map(|page_start| async move {
132		loop {
133			if !do_memory_page_fetch(client, mion_ip, page_start, retry_counter_ref, sender_ref)
134				.await
135			{
136				break;
137			}
138		}
139	})
140	.buffered(MAX_MEMORY_CONCURRENCY)
141	.collect::<Vec<()>>();
142
143	// As requests finish, they may not necissarily be in order.
144	// We need to reorder them to ensure they're called back in a
145	// serial order.
146	let join_handle = do_memory_page_ordering(
147		page_results_consumer,
148		stop_requests_sender,
149		start_address,
150		buff_callback,
151	);
152
153	{
154		let recv_future = stop_requests_consumer.recv();
155
156		futures::pin_mut!(buffered_stream_future);
157		futures::pin_mut!(recv_future);
158
159		let (select, _joined) = futures::future::join(
160			// Wait for all requests to finish, or a stop signal caused by an error.
161			futures::future::select(buffered_stream_future, recv_future),
162			// Wait for all of our ordering to finish too.
163			join_handle,
164		)
165		.await;
166		match select {
167			Either::Right((error, _)) => {
168				if let Some(cause) = error {
169					return Err(cause);
170				}
171			}
172			Either::Left(_) => {}
173		}
174	}
175
176	// Double check we didn't get an error ordering all the pages.
177	if let Ok(cause) = stop_requests_consumer.try_recv() {
178		return Err(cause);
179	}
180
181	Ok(())
182}
183
184async fn do_memory_page_ordering<FnTy>(
185	mut results: BoundedReceiver<Result<(usize, Vec<u8>), CatBridgeError>>,
186	stopper: BoundedSender<CatBridgeError>,
187	start_at: usize,
188	mut callback: FnTy,
189) where
190	FnTy: FnMut(Vec<u8>),
191{
192	let mut out_of_order_cache: FnvHashMap<usize, Vec<u8>> = FnvHashMap::default();
193	let mut looking_for_page = start_at;
194
195	while looking_for_page <= MEMORY_MAX_ADDRESS {
196		if let Some(data) = out_of_order_cache.remove(&looking_for_page) {
197			callback(data);
198			looking_for_page += 512;
199			continue;
200		}
201
202		let Some(page_result) = results.recv().await else {
203			_ = stopper.send(CatBridgeError::ClosedChannel).await;
204			break;
205		};
206		match page_result {
207			Ok((addr, data)) => {
208				out_of_order_cache.insert(addr, data);
209			}
210			Err(cause) => {
211				_ = stopper.send(cause).await;
212				break;
213			}
214		}
215	}
216}
217
218async fn do_memory_page_fetch(
219	client: &Client,
220	mion_ip: Ipv4Addr,
221	page_start: usize,
222	retry_counter: &AtomicU8,
223	result_stream: &BoundedSender<Result<(usize, Vec<u8>), CatBridgeError>>,
224) -> bool {
225	let start_addr = format!("{page_start:08X}");
226	debug!(
227		bridge.ip = %mion_ip,
228		addr = %start_addr,
229		"Performing memory page fetch",
230	);
231
232	let timeout_response = timeout(
233		Duration::from_secs(MEMORY_TIMEOUT_SECONDS),
234		do_raw_memory_request(client, mion_ip, &[("start_addr", start_addr)]),
235	)
236	.await;
237
238	let Ok(potential_response) = timeout_response else {
239		if retry_counter.fetch_add(1, AtomicOrdering::AcqRel) > MAX_RETRIES {
240			_ = result_stream
241				.send(Err(NetworkError::Timeout(Duration::from_secs(
242					MEMORY_TIMEOUT_SECONDS,
243				))
244				.into()))
245				.await;
246			return false;
247		}
248		debug!(bridge.ip = %mion_ip, "Slamming Memory dump too hard... backing off for a bit");
249		tokio::time::sleep(Duration::from_secs(BACKOFF_SLEEP_SECONDS)).await;
250		return true;
251	};
252	let response = match potential_response {
253		Ok(value) => value,
254		Err(cause) => {
255			if retry_counter.fetch_add(1, AtomicOrdering::AcqRel) > MAX_RETRIES {
256				_ = result_stream.send(Err(cause.into())).await;
257				return false;
258			}
259			debug!(bridge.ip = %mion_ip, "Slamming Memory dump too hard... backing off for a bit");
260			tokio::time::sleep(Duration::from_secs(BACKOFF_SLEEP_SECONDS)).await;
261			return true;
262		}
263	};
264
265	let status = response.status().as_u16();
266	let timeout_body_result = timeout(
267		Duration::from_secs(MEMORY_TIMEOUT_SECONDS),
268		response.bytes(),
269	)
270	.await;
271	let Ok(body_result) = timeout_body_result else {
272		if retry_counter.fetch_add(1, AtomicOrdering::AcqRel) > MAX_RETRIES {
273			_ = result_stream
274				.send(Err(NetworkError::Timeout(Duration::from_secs(
275					MEMORY_TIMEOUT_SECONDS,
276				))
277				.into()))
278				.await;
279			return false;
280		}
281		debug!(bridge.ip = %mion_ip, "Slamming Memory dump too hard... backing off for a bit");
282		tokio::time::sleep(Duration::from_secs(BACKOFF_SLEEP_SECONDS)).await;
283		return true;
284	};
285
286	retry_counter.store(0, AtomicOrdering::Release);
287	if status != 200 {
288		if let Ok(body) = body_result {
289			_ = result_stream
290				.send(Err(MIONCGIErrors::UnexpectedStatusCode(status, body).into()))
291				.await;
292			return false;
293		}
294
295		_ = result_stream
296			.send(Err(MIONCGIErrors::UnexpectedStatusCodeNoBody(status).into()))
297			.await;
298		return false;
299	}
300	let read_body_bytes = match body_result.map_err(NetworkError::HTTP) {
301		Ok(value) => value,
302		Err(cause) => {
303			_ = result_stream.send(Err(cause.into())).await;
304			return false;
305		}
306	};
307	let body_as_string =
308		match String::from_utf8(read_body_bytes.into()).map_err(NetworkParseError::Utf8Expected) {
309			Ok(value) => value,
310			Err(cause) => {
311				_ = result_stream.send(Err(cause.into())).await;
312				return false;
313			}
314		};
315
316	process_received_page(page_start, result_stream, &body_as_string).await
317}
318
319async fn process_received_page(
320	page_start: usize,
321	result_stream: &BoundedSender<Result<(usize, Vec<u8>), CatBridgeError>>,
322	body_as_string: &str,
323) -> bool {
324	let table = match extract_memory_table_body(body_as_string) {
325		Ok(value) => value,
326		Err(cause) => {
327			_ = result_stream.send(Err(cause.into())).await;
328			return false;
329		}
330	};
331	let mut page_of_bytes = Vec::with_capacity(512);
332	for table_row in table.split("<tr>").skip(3) {
333		for table_column in table_row
334			.trim()
335			.trim_end_matches("</tbody>")
336			.trim_end()
337			.trim_end_matches("</tr>")
338			.trim_end()
339			.replace("</td>", "")
340			.split("<td>")
341			.skip(3)
342		{
343			if table_column.trim().len() != 2 {
344				_ = result_stream
345					.send(Err(MIONCGIErrors::HtmlResponseBadByte(
346						table_column.to_owned(),
347					)
348					.into()))
349					.await;
350				return false;
351			}
352			let byte = match u8::from_str_radix(table_column.trim(), 16)
353				.map_err(|_| MIONCGIErrors::HtmlResponseBadByte(table_column.to_owned()))
354			{
355				Ok(value) => value,
356				Err(cause) => {
357					_ = result_stream.send(Err(cause.into())).await;
358					return false;
359				}
360			};
361			page_of_bytes.push(byte);
362		}
363	}
364
365	_ = result_stream.send(Ok((page_start, page_of_bytes))).await;
366	false
367}
368
369fn extract_memory_table_body(body: &str) -> Result<String, MIONCGIErrors> {
370	let start = body
371		.find(TABLE_START_SIGIL)
372		.ok_or_else(|| MIONCGIErrors::HtmlResponseMissingMemoryDumpSigil(body.to_owned()))?;
373	let body_minus_start = &body[start + TABLE_START_SIGIL.len()..];
374	let end = body_minus_start
375		.find(TABLE_END_SIGIL)
376		.ok_or_else(|| MIONCGIErrors::HtmlResponseMissingMemoryDumpSigil(body.to_owned()))?;
377
378	Ok(body_minus_start[..end].to_owned())
379}
380
381/// Perform a raw request on the MION board's `eeprom_dump.cgi` page.
382///
383/// *note: you probably want to call one of the actual methods, as this is
384/// basically just a thin wrapper around an HTTP Post Request. Not doing much
385/// else more. A lot of it requires that you set things up correctly.*
386///
387/// ## Errors
388///
389/// - If we cannot make an HTTP request to the MION Request.
390/// - If we fail to encode your parameters into a request body.
391pub async fn do_raw_memory_request<UrlEncodableType>(
392	client: &Client,
393	mion_ip: Ipv4Addr,
394	url_parameters: UrlEncodableType,
395) -> Result<Response, NetworkError>
396where
397	UrlEncodableType: Serialize,
398{
399	Ok(client
400		.post(format!("http://{mion_ip}/dbg/mem_dump.cgi"))
401		.version(Version::HTTP_11)
402		.header("authorization", format!("Basic {AUTHZ_HEADER}"))
403		.header("content-type", "application/x-www-form-urlencoded")
404		.header(
405			"user-agent",
406			format!("cat-dev/{}", env!("CARGO_PKG_VERSION")),
407		)
408		.body::<String>(
409			serde_urlencoded::to_string(&url_parameters)
410				.map_err(MIONCGIErrors::FormDataEncodeError)?,
411		)
412		.send()
413		.await?)
414}