corevm_host/
streams.rs

1use alloc::{borrow::Cow, vec::Vec};
2use codec::{Compact, CompactLen, ConstEncodedLen, CountedInput, Decode, Encode, MaxEncodedLen};
3use jam_types::{Segment, SEGMENT_LEN};
4
5/// Output stream identifier.
6#[derive(Encode, Decode, MaxEncodedLen, Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
7pub enum OutputStream {
8	Stdout = 0,
9	Stderr = 1,
10	Video = 2,
11	Audio = 3,
12}
13
14impl OutputStream {
15	/// Total no. of output streams.
16	pub const COUNT: usize = 4;
17
18	/// All output streams.
19	pub const ALL: [OutputStream; Self::COUNT] = {
20		use OutputStream::*;
21		[Stdout, Stderr, Video, Audio]
22	};
23}
24
25impl ConstEncodedLen for OutputStream {}
26
27/// A chunk of console output.
28#[derive(Debug)]
29pub struct ConsoleChunk {
30	/// Offset from the start of the slot in milliseconds.
31	pub time_offset: u64,
32	pub buf: Vec<u8>,
33}
34
35/// The size of the chunk encoded as a part of [`ConsoleChunks`] (with delta encoding).
36fn delta_encoded_size(time_offset: u64, len: usize, prev_time_offset: u64) -> usize {
37	Compact::compact_len(&time_offset.wrapping_sub(prev_time_offset)) +
38		Compact::compact_len(&(len as u64)) +
39		len
40}
41
42/// Several chunks of console output.
43#[derive(Debug)]
44pub struct ConsoleChunks {
45	chunks: Vec<ConsoleChunk>,
46	/// Encoded size without the chunks vector length.
47	///
48	/// This field is computed automatically.
49	encoded_size: usize,
50}
51
52impl ConsoleChunks {
53	pub const fn new() -> Self {
54		Self { chunks: Vec::new(), encoded_size: 0 }
55	}
56
57	pub fn clear(&mut self) {
58		self.chunks.clear();
59		self.encoded_size = 0;
60	}
61
62	pub fn into_inner(self) -> Vec<ConsoleChunk> {
63		self.chunks
64	}
65
66	/// Append new console buffer.
67	///
68	/// If the supplied time offset matches the one of the last chunk, then this chunk is
69	/// extended; otherwise a new chunk with the matching time offset is appended.
70	pub fn append(&mut self, time_offset: u64, buf: Cow<'_, [u8]>) {
71		match self.chunks.last_mut() {
72			Some(chunk) if chunk.time_offset == time_offset => {
73				self.encoded_size -= Compact::compact_len(&(chunk.buf.len() as u64));
74				chunk.buf.extend_from_slice(buf.as_ref());
75				self.encoded_size += Compact::compact_len(&(chunk.buf.len() as u64));
76				self.encoded_size += buf.len();
77			},
78			_ => self.push(ConsoleChunk { time_offset, buf: buf.into_owned() }),
79		}
80	}
81
82	/// Pre-allocate `len` bytes in the last chunk.
83	///
84	/// Appends a new chunk if there are no chunks or if the time offset of the last chunk doesn't
85	/// match the supplied time offset.
86	///
87	/// Panics on length overflow.
88	#[must_use]
89	pub fn pre_allocate(&mut self, time_offset: u64, len: usize) -> &mut [u8] {
90		match self.chunks.last_mut() {
91			Some(chunk) if chunk.time_offset == time_offset => {},
92			_ => self.push(ConsoleChunk { time_offset, buf: Vec::new() }),
93		}
94		let buf = &mut self.chunks.last_mut().expect("Initialized above").buf;
95		self.encoded_size -= Compact::compact_len(&(buf.len() as u64));
96		let offset = buf.len();
97		buf.resize(offset + len, 0_u8);
98		self.encoded_size += Compact::compact_len(&(buf.len() as u64));
99		self.encoded_size += len;
100		&mut buf[offset..]
101	}
102
103	fn push(&mut self, chunk: ConsoleChunk) {
104		let prev_time_offset = self
105			.chunks
106			.last()
107			.map(|last_chunk| {
108				debug_assert!(
109					last_chunk.time_offset < chunk.time_offset,
110					"Console chunks are not monotonic: last chunk = {}, new chunk = {}",
111					last_chunk.time_offset,
112					chunk.time_offset
113				);
114				last_chunk.time_offset
115			})
116			.unwrap_or(0);
117		self.encoded_size +=
118			delta_encoded_size(chunk.time_offset, chunk.buf.len(), prev_time_offset);
119		self.chunks.push(chunk);
120	}
121
122	/// Returns the encoded size after a chunk with the supplied offset and having `len` bytes is
123	/// appended.
124	///
125	/// Returns `None` on overflow.
126	pub fn encoded_size_after(&self, time_offset: u64, len: usize) -> Option<usize> {
127		let mut encoded_size = self.encoded_size;
128		let mut chunks_len = self.chunks.len();
129		match self.chunks.last() {
130			Some(chunk) if chunk.time_offset == time_offset => {
131				encoded_size -= Compact::compact_len(&(chunk.buf.len() as u64));
132				encoded_size += Compact::compact_len(&(chunk.buf.len().checked_add(len)? as u64));
133			},
134			last => {
135				let prev_time_offset = last.map(|chunk| chunk.time_offset).unwrap_or(0);
136				if time_offset < prev_time_offset {
137					// Non-monotonic.
138					return None;
139				}
140				encoded_size += Compact::compact_len(&time_offset.wrapping_sub(prev_time_offset));
141				encoded_size += Compact::compact_len(&(len as u64));
142				chunks_len += 1;
143			},
144		}
145		// Plus encoded size of the chunks vector length.
146		Some(encoded_size.checked_add(len)? + Compact::compact_len(&(chunks_len as u64)))
147	}
148}
149
150impl Default for ConsoleChunks {
151	fn default() -> Self {
152		Self::new()
153	}
154}
155
156impl core::ops::Deref for ConsoleChunks {
157	type Target = [ConsoleChunk];
158
159	fn deref(&self) -> &Self::Target {
160		&self.chunks[..]
161	}
162}
163
164impl<'a> FromIterator<(u64, Cow<'a, [u8]>)> for ConsoleChunks {
165	fn from_iter<I: IntoIterator<Item = (u64, Cow<'a, [u8]>)>>(iter: I) -> Self {
166		let mut chunks = Self::new();
167		chunks.extend(iter);
168		chunks
169	}
170}
171
172impl<'a> Extend<(u64, Cow<'a, [u8]>)> for ConsoleChunks {
173	fn extend<I: IntoIterator<Item = (u64, Cow<'a, [u8]>)>>(&mut self, chunks: I) {
174		for (time_offset, buf) in chunks.into_iter() {
175			self.append(time_offset, buf);
176		}
177	}
178}
179
180impl FromIterator<ConsoleChunk> for ConsoleChunks {
181	fn from_iter<I: IntoIterator<Item = ConsoleChunk>>(iter: I) -> Self {
182		let mut chunks = Self::new();
183		chunks.extend(iter);
184		chunks
185	}
186}
187
188impl Extend<ConsoleChunk> for ConsoleChunks {
189	fn extend<I: IntoIterator<Item = ConsoleChunk>>(&mut self, chunks: I) {
190		for ConsoleChunk { time_offset, buf } in chunks.into_iter() {
191			self.append(time_offset, buf.into());
192		}
193	}
194}
195
196impl<'a> FromIterator<&'a ConsoleChunk> for ConsoleChunks {
197	fn from_iter<I: IntoIterator<Item = &'a ConsoleChunk>>(iter: I) -> Self {
198		let mut chunks = Self::new();
199		chunks.extend(iter);
200		chunks
201	}
202}
203
204impl<'a> Extend<&'a ConsoleChunk> for ConsoleChunks {
205	fn extend<I: IntoIterator<Item = &'a ConsoleChunk>>(&mut self, chunks: I) {
206		for ConsoleChunk { time_offset, buf } in chunks.into_iter() {
207			self.append(*time_offset, buf.into());
208		}
209	}
210}
211
212impl Encode for ConsoleChunks {
213	fn encode_to<T: codec::Output + ?Sized>(&self, output: &mut T) {
214		// Use delta encoding to reduce the encoded size of the time offsets.
215		let mut prev = 0;
216		Compact(self.chunks.len() as u64).encode_to(output);
217		for ConsoleChunk { time_offset, buf } in self.chunks.iter() {
218			Compact(time_offset.wrapping_sub(prev)).encode_to(output);
219			buf.encode_to(output);
220			prev = *time_offset;
221		}
222	}
223
224	fn encoded_size(&self) -> usize {
225		// Plus encoded size of the chunks vector length.
226		self.encoded_size + Compact::compact_len(&(self.chunks.len() as u64))
227	}
228}
229
230impl Decode for ConsoleChunks {
231	fn decode<I: codec::Input>(input: &mut I) -> Result<Self, codec::Error> {
232		let mut input = CountedInput::new(input);
233		let len = Compact::<u64>::decode(&mut input)?.0 as usize;
234		let mut chunks = Vec::with_capacity(len);
235		let mut prev = 0;
236		for _ in 0..len {
237			let time_offset = Compact::<u64>::decode(&mut input)?.0.wrapping_add(prev);
238			let buf: Vec<u8> = Decode::decode(&mut input)?;
239			chunks.push(ConsoleChunk { time_offset, buf });
240			prev = time_offset;
241		}
242		// Minus encoded size of the chunks vector length.
243		let encoded_size = input.count() as usize - Compact::compact_len(&(chunks.len() as u64));
244		Ok(Self { chunks, encoded_size })
245	}
246}
247
248/// CoreVM output stream buffers.
249///
250/// Use this struct to collect output from the service.
251#[derive(Default)]
252pub struct OutputBuffers {
253	console_buffers: [ConsoleChunks; 2],
254	buffers: [Vec<u8>; 2],
255}
256
257impl OutputBuffers {
258	/// Create output buffers from the output streams stored in `segments`.
259	///
260	/// Each stream size is specified in `stream_len` array.
261	///
262	/// The data should start at the beginning of the first segment and end somewhere in the last
263	/// segment. The total length equals the sum of lengths in `stream_len` array.
264	pub fn from_segments(
265		segments: &[Segment],
266		stream_len: &[u32; OutputStream::COUNT],
267	) -> Result<Self, codec::Error> {
268		let total_len: usize = stream_len.iter().map(|len| *len as usize).sum();
269		let mut input = SegmentedInput::new(segments, 0, total_len);
270		let mut buffers = Self::default();
271		for (src_len, stream) in stream_len.iter().zip(OutputStream::ALL) {
272			if *src_len == 0 {
273				continue;
274			}
275			let src_len = *src_len as usize;
276			debug_assert!(
277				src_len <= input.end - input.offset,
278				"src len = {src_len}, remaining len = {}",
279				input.end - input.offset
280			);
281			buffers.set_stream(stream, src_len, &mut input)?;
282		}
283		Ok(buffers)
284	}
285
286	/// Create output buffers from the specified output stream stored in `segments`.
287	pub fn from_segments_one(
288		segments: &[Segment],
289		stream: OutputStream,
290		stream_start: usize,
291		stream_end: usize,
292	) -> Result<Self, codec::Error> {
293		let src_len = stream_end - stream_start;
294		let mut input = SegmentedInput::new(segments, stream_start, stream_end);
295		let mut buffers = Self::default();
296		debug_assert!(
297			src_len <= input.end - input.offset,
298			"src len = {src_len}, remaining len = {}",
299			input.end - input.offset
300		);
301		buffers.set_stream(stream, src_len, &mut input)?;
302		Ok(buffers)
303	}
304
305	fn set_stream(
306		&mut self,
307		i: OutputStream,
308		src_len: usize,
309		input: &mut SegmentedInput<'_>,
310	) -> Result<(), codec::Error> {
311		use OutputStream::*;
312		match i {
313			Stdout | Stderr => {
314				let old_offset = input.offset;
315				let chunks = ConsoleChunks::decode(input)?;
316				if input.offset - old_offset != src_len {
317					return Err("Invalid console chunks size".into());
318				}
319				self.console_buffers[i as usize] = chunks;
320			},
321			Video | Audio => {
322				let dst = &mut self.buffers[i as usize - 2];
323				let dst_offset = dst.len();
324				dst.resize(dst_offset + src_len, 0_u8);
325				input.read(&mut dst[dst_offset..]);
326			},
327		}
328		Ok(())
329	}
330
331	/// Get decoded stdout/stderr buffer.
332	pub fn get_console_buf(&self, i: OutputStream) -> &[ConsoleChunk] {
333		assert!(matches!(i, OutputStream::Stdout | OutputStream::Stderr));
334		&self.console_buffers[i as usize][..]
335	}
336
337	/// Take decoded stdout/stderr buffer.
338	pub fn take_console_buf(&mut self, i: OutputStream) -> Vec<ConsoleChunk> {
339		assert!(matches!(i, OutputStream::Stdout | OutputStream::Stderr));
340		core::mem::take(&mut self.console_buffers[i as usize]).into_inner()
341	}
342
343	/// Get encoded video/audio buffer.
344	pub fn get_encoded_buf(&self, i: OutputStream) -> &[u8] {
345		assert!(matches!(i, OutputStream::Video | OutputStream::Audio));
346		&self.buffers[i as usize - 2]
347	}
348
349	/// Take encoded video/audio buffer.
350	pub fn take_encoded_buf(&mut self, i: OutputStream) -> Vec<u8> {
351		assert!(matches!(i, OutputStream::Video | OutputStream::Audio));
352		core::mem::take(&mut self.buffers[i as usize - 2])
353	}
354
355	/// Clear all buffers.
356	pub fn clear(&mut self) {
357		for buf in self.console_buffers.iter_mut() {
358			buf.clear();
359		}
360		for buf in self.buffers.iter_mut() {
361			buf.clear();
362		}
363	}
364}
365
366/// A reader that reads directly from segments.
367///
368/// This avoids unnecessary copying potentially large amount of data.
369#[derive(Debug)]
370struct SegmentedInput<'a> {
371	segments: &'a [Segment],
372	end: usize,
373	offset: usize,
374}
375
376impl<'a> SegmentedInput<'a> {
377	fn new(segments: &'a [Segment], start: usize, end: usize) -> Self {
378		Self { segments, end, offset: start }
379	}
380
381	fn read(&mut self, dest: &mut [u8]) {
382		let dest_len = dest.len();
383		let mut dest_offset = 0;
384		while dest_offset != dest_len {
385			let i = self.offset / SEGMENT_LEN;
386			let src_offset = self.offset % SEGMENT_LEN;
387			let n = (SEGMENT_LEN - src_offset).min(dest_len - dest_offset);
388			dest[dest_offset..dest_offset + n]
389				.copy_from_slice(&self.segments[i][src_offset..src_offset + n]);
390			dest_offset += n;
391			self.offset += n;
392		}
393	}
394}
395
396impl codec::Input for SegmentedInput<'_> {
397	fn remaining_len(&mut self) -> Result<Option<usize>, codec::Error> {
398		Ok(Some(self.end - self.offset))
399	}
400
401	fn read(&mut self, dest: &mut [u8]) -> Result<(), codec::Error> {
402		SegmentedInput::read(self, dest);
403		Ok(())
404	}
405}
406
407#[cfg(test)]
408mod tests {
409	use super::*;
410	use alloc::{vec, vec::Vec};
411	use rand::Rng;
412
413	#[test]
414	fn segmented_input_works() {
415		#[derive(Encode, Decode, Debug, PartialEq, Eq)]
416		struct Dummy {
417			x: u64,
418			y: u32,
419			z: Vec<Dummy>,
420		}
421		let mut rng = rand::rng();
422		let num_values = rng.random_range(0..1000);
423		let mut values: Vec<Dummy> = Vec::with_capacity(num_values);
424		for _ in 0..num_values {
425			let z_len = rng.random_range(0..10);
426			let mut z = Vec::with_capacity(z_len);
427			for _ in 0..z_len {
428				z.push(Dummy { x: rng.random(), y: rng.random(), z: Vec::new() });
429			}
430			values.push(Dummy { x: rng.random(), y: rng.random(), z });
431		}
432		let mut encoded = values.encode();
433		let encoded_len = encoded.len();
434		while !encoded.len().is_multiple_of(SEGMENT_LEN) {
435			encoded.push(0_u8);
436		}
437		let segments = encoded
438			.chunks(SEGMENT_LEN)
439			.map(|slice| slice.to_vec().try_into().unwrap())
440			.collect::<Vec<Segment>>();
441		let mut input = SegmentedInput::new(&segments, 0, encoded_len);
442		let actual_values: Vec<Dummy> = Decode::decode(&mut input).unwrap();
443		assert_eq!(values, actual_values);
444	}
445
446	#[test]
447	fn console_chunks_encoded_size_works() {
448		macro_rules! check {
449			($chunks: expr) => {{
450				let chunks = $chunks;
451				assert_eq!(chunks.encode().len(), chunks.encoded_size(), "Chunks = {chunks:?}");
452			}};
453		}
454		check!(ConsoleChunks::default());
455		check!({
456			let mut chunks = ConsoleChunks::new();
457			chunks.push(ConsoleChunk { time_offset: 99999, buf: vec![0_u8; 123] });
458			chunks
459		});
460		check!({
461			let mut chunks = ConsoleChunks::new();
462			chunks.push(ConsoleChunk { time_offset: 99999, buf: vec![0_u8; 0] });
463			chunks
464		});
465		check!({
466			let mut chunks = ConsoleChunks::new();
467			chunks.push(ConsoleChunk { time_offset: 99999, buf: vec![0_u8; 123] });
468			chunks.append(99999, vec![0_u8; 123].into());
469			chunks
470		});
471		check!({
472			let mut chunks = ConsoleChunks::new();
473			chunks.push(ConsoleChunk { time_offset: 99999, buf: vec![0_u8; 123] });
474			chunks.append(100_000, vec![0_u8; 123].into());
475			chunks
476		});
477		check!({
478			let mut chunks = ConsoleChunks::new();
479			chunks.push(ConsoleChunk { time_offset: 99999, buf: vec![0_u8; 123] });
480			let _ = chunks.pre_allocate(100_000, 123);
481			chunks
482		});
483		check!({
484			let mut chunks = ConsoleChunks::new();
485			chunks.push(ConsoleChunk { time_offset: 99999, buf: vec![0_u8; 123] });
486			let _ = chunks.pre_allocate(100_000, 123);
487			let bytes = chunks.encode();
488			ConsoleChunks::decode(&mut &bytes[..]).unwrap()
489		});
490	}
491
492	#[test]
493	fn console_chunks_encoded_size_after_works() {
494		let mut chunks = ConsoleChunks::new();
495		let expected_encoded_size = chunks.encoded_size_after(99999, 123).unwrap();
496		chunks.push(ConsoleChunk { time_offset: 99999, buf: vec![0_u8; 123] });
497		assert_eq!(expected_encoded_size, chunks.encode().len());
498	}
499}