corevm_host/
streams.rs

1extern crate alloc;
2
3use alloc::vec::Vec;
4use codec::{ConstEncodedLen, Decode, Encode, MaxEncodedLen};
5use jam_types::{Segment, SEGMENT_LEN};
6
7/// Output stream identifier.
8#[derive(Encode, Decode, MaxEncodedLen, Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
9pub enum OutputStream {
10	Stdout = 1,
11	Stderr = 2,
12	Video = 3,
13	Audio = 4,
14}
15
16impl OutputStream {
17	/// Total no. of output streams.
18	pub const COUNT: usize = 4;
19
20	/// All output streams.
21	pub const ALL: [OutputStream; 4] = {
22		use OutputStream::*;
23		[Stdout, Stderr, Video, Audio]
24	};
25}
26
27impl ConstEncodedLen for OutputStream {}
28
29/// CoreVM output stream buffers (builder counterpart).
30///
31/// The builder uses this struct to collect output from the service.
32///
33/// The [service counterpart](corevm::OutputBuffers).
34#[derive(Default)]
35pub struct OutputBuffers {
36	buffers: [Vec<u8>; OutputStream::COUNT],
37}
38
39impl OutputBuffers {
40	/// Append the output streams from `segments`.
41	///
42	/// Each stream size is specified in `stream_len` array.
43	///
44	/// The data starts with `segments[0][..]` and ends with
45	/// `segments[segments.len() - 1][..total_stream_len % SEGMENT_LEN]` when `sum_stream_len > 0`.
46	/// Here `total_stream_len` is the total size of all streams.
47	pub fn append_from_segments(
48		&mut self,
49		segments: &[Segment],
50		stream_len: &[u32; OutputStream::COUNT],
51	) {
52		let total_len: usize = stream_len.iter().map(|len| *len as usize).sum();
53		let mut input = SegmentedInput::new(segments, 0, total_len);
54		for (src_len, stream) in stream_len.iter().zip(OutputStream::ALL) {
55			if *src_len == 0 {
56				continue;
57			}
58			let src_len = *src_len as usize;
59			debug_assert!(
60				src_len <= input.end - input.offset,
61				"src len = {src_len}, remaining len = {}",
62				input.end - input.offset
63			);
64			let dst = self.get_mut(stream);
65			let dst_offset = dst.len();
66			dst.resize(dst_offset + src_len, 0_u8);
67			input.read(&mut dst[dst_offset..]);
68		}
69	}
70
71	/// Append specified output stream from `segments`.
72	pub fn append_from_segments_one(
73		&mut self,
74		segments: &[Segment],
75		stream: OutputStream,
76		stream_start: usize,
77		stream_end: usize,
78	) {
79		let src_len = stream_end - stream_start;
80		let mut input = SegmentedInput::new(segments, stream_start, stream_end);
81		debug_assert!(
82			src_len <= input.end - input.offset,
83			"src len = {src_len}, remaining len = {}",
84			input.end - input.offset
85		);
86		let dst = self.get_mut(stream);
87		let dst_offset = dst.len();
88		dst.resize(dst_offset + src_len, 0_u8);
89		input.read(&mut dst[dst_offset..]);
90	}
91
92	/// Get immutable slice of buffer `i`.
93	pub fn get(&self, i: OutputStream) -> &[u8] {
94		&self.buffers[i as usize - 1]
95	}
96
97	/// Get mutable slice of buffer `i`.
98	pub fn get_mut(&mut self, i: OutputStream) -> &mut Vec<u8> {
99		&mut self.buffers[i as usize - 1]
100	}
101
102	/// Clear all buffers.
103	pub fn clear(&mut self) {
104		for buf in self.buffers.iter_mut() {
105			buf.clear();
106		}
107	}
108}
109
110/// A reader that reads directly from segments.
111///
112/// This avoids unnecessary copying potentially large amount of data.
113#[derive(Debug)]
114struct SegmentedInput<'a> {
115	segments: &'a [Segment],
116	end: usize,
117	offset: usize,
118}
119
120impl<'a> SegmentedInput<'a> {
121	fn new(segments: &'a [Segment], start: usize, end: usize) -> Self {
122		Self { segments, end, offset: start }
123	}
124
125	fn read(&mut self, dest: &mut [u8]) {
126		let dest_len = dest.len();
127		let mut dest_offset = 0;
128		while dest_offset != dest_len {
129			let i = self.offset / SEGMENT_LEN;
130			let src_offset = self.offset % SEGMENT_LEN;
131			let n = (SEGMENT_LEN - src_offset).min(dest_len - dest_offset);
132			dest[dest_offset..dest_offset + n]
133				.copy_from_slice(&self.segments[i][src_offset..src_offset + n]);
134			dest_offset += n;
135			self.offset += n;
136		}
137	}
138}
139
140#[cfg(test)]
141mod tests {
142	use super::*;
143	use alloc::vec::Vec;
144	use codec::{Decode, Encode};
145	use rand::Rng;
146
147	impl codec::Input for SegmentedInput<'_> {
148		fn remaining_len(&mut self) -> Result<Option<usize>, codec::Error> {
149			Ok(Some(self.end - self.offset))
150		}
151
152		fn read(&mut self, dest: &mut [u8]) -> Result<(), codec::Error> {
153			SegmentedInput::read(self, dest);
154			Ok(())
155		}
156	}
157
158	#[test]
159	fn segmented_input_works() {
160		#[derive(Encode, Decode, Debug, PartialEq, Eq)]
161		struct Dummy {
162			x: u64,
163			y: u32,
164			z: Vec<Dummy>,
165		}
166		let mut rng = rand::rng();
167		let num_values = rng.random_range(0..1000);
168		let mut values: Vec<Dummy> = Vec::with_capacity(num_values);
169		for _ in 0..num_values {
170			let z_len = rng.random_range(0..10);
171			let mut z = Vec::with_capacity(z_len);
172			for _ in 0..z_len {
173				z.push(Dummy { x: rng.random(), y: rng.random(), z: Vec::new() });
174			}
175			values.push(Dummy { x: rng.random(), y: rng.random(), z });
176		}
177		let mut encoded = values.encode();
178		let encoded_len = encoded.len();
179		while encoded.len() % SEGMENT_LEN != 0 {
180			encoded.push(0_u8);
181		}
182		let segments = encoded
183			.chunks(SEGMENT_LEN)
184			.map(|slice| slice.to_vec().try_into().unwrap())
185			.collect::<Vec<Segment>>();
186		let mut input = SegmentedInput::new(&segments, 0, encoded_len);
187		let actual_values: Vec<Dummy> = Decode::decode(&mut input).unwrap();
188		assert_eq!(values, actual_values);
189	}
190}