1use alloc::{borrow::Cow, vec::Vec};
2use codec::{Compact, CompactLen, ConstEncodedLen, CountedInput, Decode, Encode, MaxEncodedLen};
3use jam_types::{Segment, SEGMENT_LEN};
4
5#[derive(Encode, Decode, MaxEncodedLen, Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
7pub enum OutputStream {
8 Stdout = 0,
9 Stderr = 1,
10 Video = 2,
11 Audio = 3,
12}
13
14impl OutputStream {
15 pub const COUNT: usize = 4;
17
18 pub const ALL: [OutputStream; Self::COUNT] = {
20 use OutputStream::*;
21 [Stdout, Stderr, Video, Audio]
22 };
23}
24
25impl ConstEncodedLen for OutputStream {}
26
27#[derive(Debug)]
29pub struct ConsoleChunk {
30 pub time_offset: u64,
32 pub buf: Vec<u8>,
33}
34
35fn delta_encoded_size(time_offset: u64, len: usize, prev_time_offset: u64) -> usize {
37 Compact::compact_len(&time_offset.wrapping_sub(prev_time_offset)) +
38 Compact::compact_len(&(len as u64)) +
39 len
40}
41
42#[derive(Debug)]
44pub struct ConsoleChunks {
45 chunks: Vec<ConsoleChunk>,
46 encoded_size: usize,
50}
51
52impl ConsoleChunks {
53 pub const fn new() -> Self {
54 Self { chunks: Vec::new(), encoded_size: 0 }
55 }
56
57 pub fn clear(&mut self) {
58 self.chunks.clear();
59 self.encoded_size = 0;
60 }
61
62 pub fn into_inner(self) -> Vec<ConsoleChunk> {
63 self.chunks
64 }
65
66 pub fn append(&mut self, time_offset: u64, buf: Cow<'_, [u8]>) {
71 match self.chunks.last_mut() {
72 Some(chunk) if chunk.time_offset == time_offset => {
73 self.encoded_size -= Compact::compact_len(&(chunk.buf.len() as u64));
74 chunk.buf.extend_from_slice(buf.as_ref());
75 self.encoded_size += Compact::compact_len(&(chunk.buf.len() as u64));
76 self.encoded_size += buf.len();
77 },
78 _ => self.push(ConsoleChunk { time_offset, buf: buf.into_owned() }),
79 }
80 }
81
82 #[must_use]
89 pub fn pre_allocate(&mut self, time_offset: u64, len: usize) -> &mut [u8] {
90 match self.chunks.last_mut() {
91 Some(chunk) if chunk.time_offset == time_offset => {},
92 _ => self.push(ConsoleChunk { time_offset, buf: Vec::new() }),
93 }
94 let buf = &mut self.chunks.last_mut().expect("Initialized above").buf;
95 self.encoded_size -= Compact::compact_len(&(buf.len() as u64));
96 let offset = buf.len();
97 buf.resize(offset + len, 0_u8);
98 self.encoded_size += Compact::compact_len(&(buf.len() as u64));
99 self.encoded_size += len;
100 &mut buf[offset..]
101 }
102
103 fn push(&mut self, chunk: ConsoleChunk) {
104 let prev_time_offset = self
105 .chunks
106 .last()
107 .map(|last_chunk| {
108 debug_assert!(
109 last_chunk.time_offset < chunk.time_offset,
110 "Console chunks are not monotonic: last chunk = {}, new chunk = {}",
111 last_chunk.time_offset,
112 chunk.time_offset
113 );
114 last_chunk.time_offset
115 })
116 .unwrap_or(0);
117 self.encoded_size +=
118 delta_encoded_size(chunk.time_offset, chunk.buf.len(), prev_time_offset);
119 self.chunks.push(chunk);
120 }
121
122 pub fn encoded_size_after(&self, time_offset: u64, len: usize) -> Option<usize> {
127 let mut encoded_size = self.encoded_size;
128 let mut chunks_len = self.chunks.len();
129 match self.chunks.last() {
130 Some(chunk) if chunk.time_offset == time_offset => {
131 encoded_size -= Compact::compact_len(&(chunk.buf.len() as u64));
132 encoded_size += Compact::compact_len(&(chunk.buf.len().checked_add(len)? as u64));
133 },
134 last => {
135 let prev_time_offset = last.map(|chunk| chunk.time_offset).unwrap_or(0);
136 if time_offset < prev_time_offset {
137 return None;
139 }
140 encoded_size += Compact::compact_len(&time_offset.wrapping_sub(prev_time_offset));
141 encoded_size += Compact::compact_len(&(len as u64));
142 chunks_len += 1;
143 },
144 }
145 Some(encoded_size.checked_add(len)? + Compact::compact_len(&(chunks_len as u64)))
147 }
148}
149
150impl Default for ConsoleChunks {
151 fn default() -> Self {
152 Self::new()
153 }
154}
155
156impl core::ops::Deref for ConsoleChunks {
157 type Target = [ConsoleChunk];
158
159 fn deref(&self) -> &Self::Target {
160 &self.chunks[..]
161 }
162}
163
164impl<'a> FromIterator<(u64, Cow<'a, [u8]>)> for ConsoleChunks {
165 fn from_iter<I: IntoIterator<Item = (u64, Cow<'a, [u8]>)>>(iter: I) -> Self {
166 let mut chunks = Self::new();
167 chunks.extend(iter);
168 chunks
169 }
170}
171
172impl<'a> Extend<(u64, Cow<'a, [u8]>)> for ConsoleChunks {
173 fn extend<I: IntoIterator<Item = (u64, Cow<'a, [u8]>)>>(&mut self, chunks: I) {
174 for (time_offset, buf) in chunks.into_iter() {
175 self.append(time_offset, buf);
176 }
177 }
178}
179
180impl FromIterator<ConsoleChunk> for ConsoleChunks {
181 fn from_iter<I: IntoIterator<Item = ConsoleChunk>>(iter: I) -> Self {
182 let mut chunks = Self::new();
183 chunks.extend(iter);
184 chunks
185 }
186}
187
188impl Extend<ConsoleChunk> for ConsoleChunks {
189 fn extend<I: IntoIterator<Item = ConsoleChunk>>(&mut self, chunks: I) {
190 for ConsoleChunk { time_offset, buf } in chunks.into_iter() {
191 self.append(time_offset, buf.into());
192 }
193 }
194}
195
196impl<'a> FromIterator<&'a ConsoleChunk> for ConsoleChunks {
197 fn from_iter<I: IntoIterator<Item = &'a ConsoleChunk>>(iter: I) -> Self {
198 let mut chunks = Self::new();
199 chunks.extend(iter);
200 chunks
201 }
202}
203
204impl<'a> Extend<&'a ConsoleChunk> for ConsoleChunks {
205 fn extend<I: IntoIterator<Item = &'a ConsoleChunk>>(&mut self, chunks: I) {
206 for ConsoleChunk { time_offset, buf } in chunks.into_iter() {
207 self.append(*time_offset, buf.into());
208 }
209 }
210}
211
212impl Encode for ConsoleChunks {
213 fn encode_to<T: codec::Output + ?Sized>(&self, output: &mut T) {
214 let mut prev = 0;
216 Compact(self.chunks.len() as u64).encode_to(output);
217 for ConsoleChunk { time_offset, buf } in self.chunks.iter() {
218 Compact(time_offset.wrapping_sub(prev)).encode_to(output);
219 buf.encode_to(output);
220 prev = *time_offset;
221 }
222 }
223
224 fn encoded_size(&self) -> usize {
225 self.encoded_size + Compact::compact_len(&(self.chunks.len() as u64))
227 }
228}
229
230impl Decode for ConsoleChunks {
231 fn decode<I: codec::Input>(input: &mut I) -> Result<Self, codec::Error> {
232 let mut input = CountedInput::new(input);
233 let len = Compact::<u64>::decode(&mut input)?.0 as usize;
234 let mut chunks = Vec::with_capacity(len);
235 let mut prev = 0;
236 for _ in 0..len {
237 let time_offset = Compact::<u64>::decode(&mut input)?.0.wrapping_add(prev);
238 let buf: Vec<u8> = Decode::decode(&mut input)?;
239 chunks.push(ConsoleChunk { time_offset, buf });
240 prev = time_offset;
241 }
242 let encoded_size = input.count() as usize - Compact::compact_len(&(chunks.len() as u64));
244 Ok(Self { chunks, encoded_size })
245 }
246}
247
248#[derive(Default)]
252pub struct OutputBuffers {
253 console_buffers: [ConsoleChunks; 2],
254 buffers: [Vec<u8>; 2],
255}
256
257impl OutputBuffers {
258 pub fn from_segments(
265 segments: &[Segment],
266 stream_len: &[u32; OutputStream::COUNT],
267 ) -> Result<Self, codec::Error> {
268 let total_len: usize = stream_len.iter().map(|len| *len as usize).sum();
269 let mut input = SegmentedInput::new(segments, 0, total_len);
270 let mut buffers = Self::default();
271 for (src_len, stream) in stream_len.iter().zip(OutputStream::ALL) {
272 if *src_len == 0 {
273 continue;
274 }
275 let src_len = *src_len as usize;
276 debug_assert!(
277 src_len <= input.end - input.offset,
278 "src len = {src_len}, remaining len = {}",
279 input.end - input.offset
280 );
281 buffers.set_stream(stream, src_len, &mut input)?;
282 }
283 Ok(buffers)
284 }
285
286 pub fn from_segments_one(
288 segments: &[Segment],
289 stream: OutputStream,
290 stream_start: usize,
291 stream_end: usize,
292 ) -> Result<Self, codec::Error> {
293 let src_len = stream_end - stream_start;
294 let mut input = SegmentedInput::new(segments, stream_start, stream_end);
295 let mut buffers = Self::default();
296 debug_assert!(
297 src_len <= input.end - input.offset,
298 "src len = {src_len}, remaining len = {}",
299 input.end - input.offset
300 );
301 buffers.set_stream(stream, src_len, &mut input)?;
302 Ok(buffers)
303 }
304
305 fn set_stream(
306 &mut self,
307 i: OutputStream,
308 src_len: usize,
309 input: &mut SegmentedInput<'_>,
310 ) -> Result<(), codec::Error> {
311 use OutputStream::*;
312 match i {
313 Stdout | Stderr => {
314 let old_offset = input.offset;
315 let chunks = ConsoleChunks::decode(input)?;
316 if input.offset - old_offset != src_len {
317 return Err("Invalid console chunks size".into());
318 }
319 self.console_buffers[i as usize] = chunks;
320 },
321 Video | Audio => {
322 let dst = &mut self.buffers[i as usize - 2];
323 let dst_offset = dst.len();
324 dst.resize(dst_offset + src_len, 0_u8);
325 input.read(&mut dst[dst_offset..]);
326 },
327 }
328 Ok(())
329 }
330
331 pub fn get_console_buf(&self, i: OutputStream) -> &[ConsoleChunk] {
333 assert!(matches!(i, OutputStream::Stdout | OutputStream::Stderr));
334 &self.console_buffers[i as usize][..]
335 }
336
337 pub fn take_console_buf(&mut self, i: OutputStream) -> Vec<ConsoleChunk> {
339 assert!(matches!(i, OutputStream::Stdout | OutputStream::Stderr));
340 core::mem::take(&mut self.console_buffers[i as usize]).into_inner()
341 }
342
343 pub fn get_encoded_buf(&self, i: OutputStream) -> &[u8] {
345 assert!(matches!(i, OutputStream::Video | OutputStream::Audio));
346 &self.buffers[i as usize - 2]
347 }
348
349 pub fn take_encoded_buf(&mut self, i: OutputStream) -> Vec<u8> {
351 assert!(matches!(i, OutputStream::Video | OutputStream::Audio));
352 core::mem::take(&mut self.buffers[i as usize - 2])
353 }
354
355 pub fn clear(&mut self) {
357 for buf in self.console_buffers.iter_mut() {
358 buf.clear();
359 }
360 for buf in self.buffers.iter_mut() {
361 buf.clear();
362 }
363 }
364}
365
366#[derive(Debug)]
370struct SegmentedInput<'a> {
371 segments: &'a [Segment],
372 end: usize,
373 offset: usize,
374}
375
376impl<'a> SegmentedInput<'a> {
377 fn new(segments: &'a [Segment], start: usize, end: usize) -> Self {
378 Self { segments, end, offset: start }
379 }
380
381 fn read(&mut self, dest: &mut [u8]) {
382 let dest_len = dest.len();
383 let mut dest_offset = 0;
384 while dest_offset != dest_len {
385 let i = self.offset / SEGMENT_LEN;
386 let src_offset = self.offset % SEGMENT_LEN;
387 let n = (SEGMENT_LEN - src_offset).min(dest_len - dest_offset);
388 dest[dest_offset..dest_offset + n]
389 .copy_from_slice(&self.segments[i][src_offset..src_offset + n]);
390 dest_offset += n;
391 self.offset += n;
392 }
393 }
394}
395
396impl codec::Input for SegmentedInput<'_> {
397 fn remaining_len(&mut self) -> Result<Option<usize>, codec::Error> {
398 Ok(Some(self.end - self.offset))
399 }
400
401 fn read(&mut self, dest: &mut [u8]) -> Result<(), codec::Error> {
402 SegmentedInput::read(self, dest);
403 Ok(())
404 }
405}
406
407#[cfg(test)]
408mod tests {
409 use super::*;
410 use alloc::{vec, vec::Vec};
411 use rand::Rng;
412
413 #[test]
414 fn segmented_input_works() {
415 #[derive(Encode, Decode, Debug, PartialEq, Eq)]
416 struct Dummy {
417 x: u64,
418 y: u32,
419 z: Vec<Dummy>,
420 }
421 let mut rng = rand::rng();
422 let num_values = rng.random_range(0..1000);
423 let mut values: Vec<Dummy> = Vec::with_capacity(num_values);
424 for _ in 0..num_values {
425 let z_len = rng.random_range(0..10);
426 let mut z = Vec::with_capacity(z_len);
427 for _ in 0..z_len {
428 z.push(Dummy { x: rng.random(), y: rng.random(), z: Vec::new() });
429 }
430 values.push(Dummy { x: rng.random(), y: rng.random(), z });
431 }
432 let mut encoded = values.encode();
433 let encoded_len = encoded.len();
434 while !encoded.len().is_multiple_of(SEGMENT_LEN) {
435 encoded.push(0_u8);
436 }
437 let segments = encoded
438 .chunks(SEGMENT_LEN)
439 .map(|slice| slice.to_vec().try_into().unwrap())
440 .collect::<Vec<Segment>>();
441 let mut input = SegmentedInput::new(&segments, 0, encoded_len);
442 let actual_values: Vec<Dummy> = Decode::decode(&mut input).unwrap();
443 assert_eq!(values, actual_values);
444 }
445
446 #[test]
447 fn console_chunks_encoded_size_works() {
448 macro_rules! check {
449 ($chunks: expr) => {{
450 let chunks = $chunks;
451 assert_eq!(chunks.encode().len(), chunks.encoded_size(), "Chunks = {chunks:?}");
452 }};
453 }
454 check!(ConsoleChunks::default());
455 check!({
456 let mut chunks = ConsoleChunks::new();
457 chunks.push(ConsoleChunk { time_offset: 99999, buf: vec![0_u8; 123] });
458 chunks
459 });
460 check!({
461 let mut chunks = ConsoleChunks::new();
462 chunks.push(ConsoleChunk { time_offset: 99999, buf: vec![0_u8; 0] });
463 chunks
464 });
465 check!({
466 let mut chunks = ConsoleChunks::new();
467 chunks.push(ConsoleChunk { time_offset: 99999, buf: vec![0_u8; 123] });
468 chunks.append(99999, vec![0_u8; 123].into());
469 chunks
470 });
471 check!({
472 let mut chunks = ConsoleChunks::new();
473 chunks.push(ConsoleChunk { time_offset: 99999, buf: vec![0_u8; 123] });
474 chunks.append(100_000, vec![0_u8; 123].into());
475 chunks
476 });
477 check!({
478 let mut chunks = ConsoleChunks::new();
479 chunks.push(ConsoleChunk { time_offset: 99999, buf: vec![0_u8; 123] });
480 let _ = chunks.pre_allocate(100_000, 123);
481 chunks
482 });
483 check!({
484 let mut chunks = ConsoleChunks::new();
485 chunks.push(ConsoleChunk { time_offset: 99999, buf: vec![0_u8; 123] });
486 let _ = chunks.pre_allocate(100_000, 123);
487 let bytes = chunks.encode();
488 ConsoleChunks::decode(&mut &bytes[..]).unwrap()
489 });
490 }
491
492 #[test]
493 fn console_chunks_encoded_size_after_works() {
494 let mut chunks = ConsoleChunks::new();
495 let expected_encoded_size = chunks.encoded_size_after(99999, 123).unwrap();
496 chunks.push(ConsoleChunk { time_offset: 99999, buf: vec![0_u8; 123] });
497 assert_eq!(expected_encoded_size, chunks.encode().len());
498 }
499}