msgpack_tracing/
storage.rs

1use crate::{
2    string_cache::{CacheInstruction, CacheInstructionSet, CacheString},
3    tape::{
4        FieldValue, Instruction, InstructionId, InstructionSet, InstructionTrait, TapeMachine,
5        Value,
6    },
7};
8use chrono::DateTime;
9use rmp::{Marker, decode, encode};
10use std::{
11    io::{self, BufRead, BufReader, Read},
12    num::NonZeroU64,
13};
14use tracing::Level;
15
16pub struct Store<W>(W);
17impl<W> Store<W>
18where
19    W: io::Write + Send + 'static,
20{
21    pub fn new(out: W) -> Self {
22        Self(out)
23    }
24
25    pub fn do_handle(write: &mut W, instruction: Instruction) -> io::Result<()> {
26        let instruction = match instruction {
27            Instruction::Restart => CacheInstruction::Restart,
28            Instruction::NewSpan { parent, span, name } => {
29                let name = CacheString::Present(name);
30                CacheInstruction::NewSpan { parent, span, name }
31            }
32            Instruction::FinishedSpan => CacheInstruction::FinishedSpan,
33            Instruction::NewRecord(span) => CacheInstruction::NewRecord(span),
34            Instruction::FinishedRecord => CacheInstruction::FinishedRecord,
35            Instruction::StartEvent {
36                time,
37                span,
38                target,
39                priority,
40            } => {
41                let target = CacheString::Present(target);
42                CacheInstruction::StartEvent {
43                    time,
44                    span,
45                    target,
46                    priority,
47                }
48            }
49            Instruction::FinishedEvent => CacheInstruction::FinishedEvent,
50            Instruction::AddValue(FieldValue { name, value }) => {
51                let name = CacheString::Present(name);
52                let value = match value {
53                    Value::Debug(str) => Value::String(CacheString::Present(str)),
54                    Value::String(str) => Value::String(CacheString::Present(str)),
55                    Value::Float(data) => Value::Float(data),
56                    Value::Integer(data) => Value::Integer(data),
57                    Value::Unsigned(data) => Value::Unsigned(data),
58                    Value::Bool(data) => Value::Bool(data),
59                    Value::ByteArray(items) => Value::ByteArray(items),
60                };
61
62                CacheInstruction::AddValue(FieldValue { name, value })
63            }
64            Instruction::DeleteSpan(span) => CacheInstruction::DeleteSpan(span),
65        };
66
67        Self::do_handle_cached(write, instruction)
68    }
69
70    pub fn do_handle_cached(write: &mut W, instruction: CacheInstruction) -> io::Result<()> {
71        write.write_all(&[instruction.id().into()])?;
72        match instruction {
73            CacheInstruction::Restart => (),
74            CacheInstruction::NewString(data) => encode::write_str(write, data)?,
75            CacheInstruction::NewSpan { parent, span, name } => {
76                let parent = parent.map(Into::into).unwrap_or(0);
77                let span = span.into();
78                encode::write_uint(write, parent)?;
79                encode::write_uint(write, span)?;
80                Self::write_cache_str(write, name)?;
81            }
82            CacheInstruction::FinishedSpan => (),
83            CacheInstruction::NewRecord(span) => {
84                let span: u64 = span.into();
85                encode::write_uint(write, span)?;
86            }
87            CacheInstruction::FinishedRecord => (),
88            CacheInstruction::StartEvent {
89                time,
90                span,
91                target,
92                priority,
93            } => {
94                let time2 = time.timestamp_subsec_nanos();
95                let time = time.timestamp() as u64;
96                let span = span.map(Into::into).unwrap_or(0);
97                let priority = priority_num(priority);
98
99                encode::write_uint(write, time)?;
100                encode::write_uint(write, time2 as u64)?;
101                encode::write_uint(write, span)?;
102                Self::write_cache_str(write, target)?;
103                encode::write_uint(write, priority)?;
104            }
105            CacheInstruction::FinishedEvent => (),
106            CacheInstruction::AddValue(field_value) => {
107                Self::write_cache_str(write, field_value.name)?;
108                Self::write_cache_value(write, field_value.value)?;
109            }
110            CacheInstruction::DeleteSpan(span) => {
111                let span = span.into();
112                encode::write_uint(write, span)?;
113            }
114        }
115        write.flush()?;
116
117        Ok(())
118    }
119
120    fn write_cache_str(write: &mut W, str: CacheString) -> io::Result<()> {
121        match str {
122            CacheString::Present(data) => encode::write_str(write, data)?,
123            CacheString::Cached(index) => {
124                CacheIndex::from(index).write(write)?;
125            }
126        }
127
128        Ok(())
129    }
130
131    fn write_cache_value(write: &mut W, value: Value<CacheString>) -> io::Result<()> {
132        match value {
133            Value::Debug(str) => {
134                encode::write_array_len(write, 1)?;
135                Self::write_cache_str(write, str)?;
136            }
137            Value::String(str) => Self::write_cache_str(write, str)?,
138            Value::Float(data) => encode::write_f64(write, data)?,
139            Value::Integer(data) => {
140                encode::write_sint(write, data)?;
141            }
142            Value::Unsigned(data) => {
143                encode::write_uint(write, data)?;
144            }
145            Value::Bool(data) => encode::write_bool(write, data)?,
146            Value::ByteArray(data) => encode::write_bin(write, data)?,
147        }
148
149        Ok(())
150    }
151}
152impl<W> TapeMachine<CacheInstructionSet> for Store<W>
153where
154    W: io::Write + Send + 'static,
155{
156    fn needs_restart(&mut self) -> bool {
157        false
158    }
159
160    fn handle(&mut self, instruction: CacheInstruction) {
161        let _ = Self::do_handle_cached(&mut self.0, instruction);
162    }
163}
164impl<W> TapeMachine<InstructionSet> for Store<W>
165where
166    W: io::Write + Send + 'static,
167{
168    fn needs_restart(&mut self) -> bool {
169        false
170    }
171
172    fn handle(&mut self, instruction: Instruction) {
173        let _ = Self::do_handle(&mut self.0, instruction);
174    }
175}
176
177pub struct Load<R> {
178    read: BufReader<R>,
179    buf1: Vec<u8>,
180    buf2: Vec<u8>,
181    started: bool,
182}
183impl<R> Load<R>
184where
185    R: io::Read,
186{
187    pub fn new(input: R) -> Self {
188        Self {
189            read: BufReader::new(input),
190            buf1: Default::default(),
191            buf2: Default::default(),
192            started: false,
193        }
194    }
195
196    pub fn restart(&mut self) {
197        self.started = false;
198    }
199
200    pub fn forward<T>(&mut self, machine: &mut T) -> io::Result<()>
201    where
202        T: TapeMachine<InstructionSet>,
203    {
204        while let Some(instruction) = self.fetch_one()? {
205            machine.handle(instruction);
206        }
207
208        Ok(())
209    }
210
211    pub fn forward_cached<T>(&mut self, machine: &mut T) -> io::Result<()>
212    where
213        T: TapeMachine<CacheInstructionSet>,
214    {
215        while let Some(instruction) = self.fetch_one_cached()? {
216            machine.handle(instruction);
217        }
218
219        Ok(())
220    }
221
222    pub fn fetch_one(&mut self) -> io::Result<Option<Instruction>> {
223        let Some(instruction) = self.fetch_one_cached()? else {
224            return Ok(None);
225        };
226
227        Ok(Some(match instruction {
228            CacheInstruction::Restart => Instruction::Restart,
229            CacheInstruction::NewString(_) => return Err(UnexpectedCached.into()),
230            CacheInstruction::NewSpan { parent, span, name } => {
231                let name = match name {
232                    CacheString::Present(str) => str,
233                    CacheString::Cached(_) => return Err(UnexpectedCached.into()),
234                };
235
236                Instruction::NewSpan { parent, span, name }
237            }
238            CacheInstruction::FinishedSpan => Instruction::FinishedSpan,
239            CacheInstruction::NewRecord(span) => Instruction::NewRecord(span),
240            CacheInstruction::FinishedRecord => Instruction::FinishedRecord,
241            CacheInstruction::StartEvent {
242                time,
243                span,
244                target,
245                priority,
246            } => {
247                let target = match target {
248                    CacheString::Present(str) => str,
249                    CacheString::Cached(_) => return Err(UnexpectedCached.into()),
250                };
251
252                Instruction::StartEvent {
253                    time,
254                    span,
255                    target,
256                    priority,
257                }
258            }
259            CacheInstruction::FinishedEvent => Instruction::FinishedEvent,
260            CacheInstruction::AddValue(FieldValue { name, value }) => {
261                let name = match name {
262                    CacheString::Present(str) => str,
263                    CacheString::Cached(_) => return Err(UnexpectedCached.into()),
264                };
265                let value = match value {
266                    Value::Debug(CacheString::Present(str)) => Value::Debug(str),
267                    Value::Debug(CacheString::Cached(_)) => return Err(UnexpectedCached.into()),
268                    Value::String(CacheString::Present(str)) => Value::String(str),
269                    Value::String(CacheString::Cached(_)) => return Err(UnexpectedCached.into()),
270                    Value::Float(value) => Value::Float(value),
271                    Value::Integer(value) => Value::Integer(value),
272                    Value::Unsigned(value) => Value::Unsigned(value),
273                    Value::Bool(value) => Value::Bool(value),
274                    Value::ByteArray(items) => Value::ByteArray(items),
275                };
276
277                Instruction::AddValue(FieldValue { name, value })
278            }
279            CacheInstruction::DeleteSpan(span) => Instruction::DeleteSpan(span),
280        }))
281    }
282
283    pub fn fetch_one_cached(&mut self) -> io::Result<Option<CacheInstruction>> {
284        let instruction = loop {
285            let Some(instruction) = self.read.fill_buf()?.first().copied() else {
286                return Ok(None);
287            };
288            self.read.consume(1);
289
290            if self.started {
291                break instruction;
292            }
293
294            if instruction == u8::from(InstructionId::Restart) {
295                self.started = true;
296                break instruction;
297            }
298        };
299
300        let instruction = InstructionId::try_from(instruction).map_err(|e| {
301            io::Error::new(io::ErrorKind::InvalidData, format!("bad instruction {e}"))
302        })?;
303
304        Ok(Some(match instruction {
305            InstructionId::Restart => CacheInstruction::Restart,
306            InstructionId::NewString => CacheInstruction::NewString(self.read_str()?),
307            InstructionId::NewSpan => {
308                let parent: u64 = decode::read_int(&mut self.read).map_err(decode_err)?;
309                let span: u64 = decode::read_int(&mut self.read).map_err(decode_err)?;
310                let name = self.read_cache_str()?;
311
312                CacheInstruction::NewSpan {
313                    parent: NonZeroU64::new(parent),
314                    span: NonZeroU64::new(span).ok_or(ZeroSpan)?,
315                    name,
316                }
317            }
318            InstructionId::FinishedSpan => CacheInstruction::FinishedSpan,
319            InstructionId::NewRecord => {
320                let span = decode::read_int(&mut self.read).map_err(decode_err)?;
321
322                CacheInstruction::NewRecord(NonZeroU64::new(span).ok_or(ZeroSpan)?)
323            }
324            InstructionId::FinishedRecord => CacheInstruction::FinishedRecord,
325            InstructionId::StartEvent => {
326                let time: u64 = decode::read_int(&mut self.read).map_err(decode_err)?;
327                let time2: u64 = decode::read_int(&mut self.read).map_err(decode_err)?;
328                let span = decode::read_int(&mut self.read).map_err(decode_err)?;
329                let target = Self::do_read_cache_str(&mut self.read, &mut self.buf1)?;
330                let priority = num_priority(decode::read_int(&mut self.read).map_err(decode_err)?);
331
332                CacheInstruction::StartEvent {
333                    time: DateTime::from_timestamp(time as i64, time2 as u32).unwrap_or_default(),
334                    span: NonZeroU64::new(span),
335                    target,
336                    priority,
337                }
338            }
339            InstructionId::FinishedEvent => CacheInstruction::FinishedEvent,
340            InstructionId::AddValue => {
341                let name = Self::do_read_cache_str(&mut self.read, &mut self.buf1)?;
342                let value = Self::do_read_value(&mut self.read, &mut self.buf2)?;
343
344                CacheInstruction::AddValue(FieldValue { name, value })
345            }
346            InstructionId::DeleteSpan => {
347                let span: u64 = decode::read_int(&mut self.read).map_err(decode_err)?;
348                CacheInstruction::DeleteSpan(NonZeroU64::new(span).ok_or(ZeroSpan)?)
349            }
350        }))
351    }
352
353    fn read_str(&mut self) -> io::Result<&str> {
354        Self::do_read_str(&mut self.read, &mut self.buf1)
355    }
356
357    fn do_read_str<'a>(read: &mut BufReader<R>, buf: &'a mut Vec<u8>) -> io::Result<&'a str> {
358        let len = decode::read_str_len(read).map_err(decode_err)?;
359        buf.resize(len as usize, 0);
360        read.read_exact(buf.as_mut_slice())?;
361
362        std::str::from_utf8(buf.as_slice()).map_err(decode_err)
363    }
364
365    fn do_read_value<'a>(
366        read: &mut BufReader<R>,
367        buf: &'a mut Vec<u8>,
368    ) -> io::Result<Value<'a, CacheString<'a>>> {
369        Ok(match Self::do_peek_marker(read)? {
370            Marker::FixArray(1) => {
371                read.consume(1);
372                Value::Debug(Self::do_read_cache_str(read, buf)?)
373            }
374            Marker::FixPos(_)
375            | Marker::FixNeg(_)
376            | Marker::I8
377            | Marker::I16
378            | Marker::I32
379            | Marker::I64 => Value::Integer(decode::read_int(read).map_err(decode_err)?),
380            Marker::FixStr(_)
381            | Marker::Str8
382            | Marker::Str16
383            | Marker::Str32
384            | Marker::FixExt1
385            | Marker::FixExt2
386            | Marker::FixExt4
387            | Marker::FixExt8 => Value::String(Self::do_read_cache_str(read, buf)?),
388            Marker::False => Value::Bool(false),
389            Marker::True => Value::Bool(true),
390            Marker::Bin8 | Marker::Bin16 | Marker::Bin32 => {
391                let n = decode::read_bin_len(read).map_err(decode_err)?;
392                buf.resize(n as usize, 0);
393                read.read_exact(buf)?;
394                Value::ByteArray(buf)
395            }
396            Marker::F32 => Value::Float(decode::read_f32(read).map_err(decode_err)? as f64),
397            Marker::F64 => Value::Float(decode::read_f64(read).map_err(decode_err)?),
398            Marker::U8 | Marker::U16 | Marker::U32 | Marker::U64 => {
399                Value::Unsigned(decode::read_int(read).map_err(decode_err)?)
400            }
401            marker => return Err(UnexpectedMarker(marker).into()),
402        })
403    }
404
405    fn read_cache_str(&mut self) -> io::Result<CacheString> {
406        Self::do_read_cache_str(&mut self.read, &mut self.buf1)
407    }
408
409    fn do_read_cache_str<'a>(
410        read: &mut BufReader<R>,
411        buf: &'a mut Vec<u8>,
412    ) -> io::Result<CacheString<'a>> {
413        Ok(match Self::do_peek_marker(read)? {
414            Marker::FixStr(_) | Marker::Str8 | Marker::Str16 | Marker::Str32 => {
415                CacheString::Present(Self::do_read_str(read, buf)?)
416            }
417            Marker::FixExt1 | Marker::FixExt2 | Marker::FixExt4 | Marker::FixExt8 => {
418                CacheString::Cached(CacheIndex::read(read)?.into())
419            }
420            marker => return Err(UnexpectedMarker(marker).into()),
421        })
422    }
423
424    fn do_peek_marker(read: &mut BufReader<R>) -> io::Result<Marker> {
425        let marker = read.fill_buf()?.first().ok_or(EofOnMarker)?;
426
427        Ok(Marker::from_u8(*marker))
428    }
429}
430
431pub fn priority_num(level: Level) -> u64 {
432    match level {
433        Level::TRACE => 0,
434        Level::DEBUG => 1,
435        Level::INFO => 2,
436        Level::WARN => 3,
437        Level::ERROR => 4,
438    }
439}
440
441pub fn num_priority(num: u64) -> Level {
442    match num {
443        0 => Level::TRACE,
444        1 => Level::DEBUG,
445        2 => Level::INFO,
446        3 => Level::WARN,
447        4 => Level::ERROR,
448        _ => Level::ERROR,
449    }
450}
451
452fn decode_err<E: ToString>(error: E) -> io::Error {
453    io::Error::new(io::ErrorKind::InvalidInput, error.to_string())
454}
455
456#[derive(thiserror::Error, Debug)]
457#[error("Unexpected type {0:?}")]
458pub struct UnexpectedMarker(Marker);
459impl From<UnexpectedMarker> for io::Error {
460    fn from(value: UnexpectedMarker) -> Self {
461        decode_err(value)
462    }
463}
464
465#[derive(thiserror::Error, Debug)]
466#[error("Expecting Msgpack Marker, got EOF")]
467pub struct EofOnMarker;
468impl From<EofOnMarker> for io::Error {
469    fn from(value: EofOnMarker) -> Self {
470        decode_err(value)
471    }
472}
473
474#[derive(thiserror::Error, Debug)]
475#[error("Span should not have value of zero")]
476pub struct ZeroSpan;
477impl From<ZeroSpan> for io::Error {
478    fn from(value: ZeroSpan) -> Self {
479        decode_err(value)
480    }
481}
482
483#[derive(thiserror::Error, Debug)]
484#[error("Trying to load cached instruction file into uncached machine")]
485pub struct UnexpectedCached;
486impl From<UnexpectedCached> for io::Error {
487    fn from(value: UnexpectedCached) -> Self {
488        decode_err(value)
489    }
490}
491
492#[derive(Clone, Copy)]
493pub enum CacheIndex {
494    U16 { data: [u8; 2] },
495    U24 { data: [u8; 3] },
496    U40 { data: [u8; 5] },
497    U64 { data: [u8; 9] },
498}
499impl From<CacheIndex> for u64 {
500    fn from(value: CacheIndex) -> Self {
501        match value {
502            CacheIndex::U16 { data } => u64::from_le_bytes([data[0], data[1], 0, 0, 0, 0, 0, 0]),
503            CacheIndex::U24 { data } => {
504                u64::from_le_bytes([data[0], data[1], data[2], 0, 0, 0, 0, 0])
505            }
506            CacheIndex::U40 { data } => {
507                u64::from_le_bytes([data[0], data[1], data[2], data[3], data[4], 0, 0, 0])
508            }
509            CacheIndex::U64 { data } => u64::from_le_bytes([
510                data[1], data[2], data[3], data[4], data[5], data[6], data[7], data[8],
511            ]),
512        }
513    }
514}
515impl From<u64> for CacheIndex {
516    fn from(value: u64) -> Self {
517        let bytes = value.to_le_bytes();
518        match bytes {
519            [data0, data1, 0, 0, 0, 0, 0, 0] => CacheIndex::U16 {
520                data: [data0, data1],
521            },
522            [data0, data1, data2, 0, 0, 0, 0, 0] => CacheIndex::U24 {
523                data: [data0, data1, data2],
524            },
525            [data0, data1, data2, data3, data4, 0, 0, 0] => CacheIndex::U40 {
526                data: [data0, data1, data2, data3, data4],
527            },
528            data => CacheIndex::U64 {
529                data: [
530                    0, data[0], data[1], data[2], data[3], data[4], data[5], data[6], data[7],
531                ],
532            },
533        }
534    }
535}
536impl CacheIndex {
537    pub fn marker(self) -> Marker {
538        match self {
539            CacheIndex::U16 { .. } => Marker::FixExt1,
540            CacheIndex::U24 { .. } => Marker::FixExt2,
541            CacheIndex::U40 { .. } => Marker::FixExt4,
542            CacheIndex::U64 { .. } => Marker::FixExt8,
543        }
544    }
545
546    pub fn data(&self) -> &[u8] {
547        match self {
548            CacheIndex::U16 { data } => data.as_slice(),
549            CacheIndex::U24 { data } => data.as_slice(),
550            CacheIndex::U40 { data } => data.as_slice(),
551            CacheIndex::U64 { data } => data.as_slice(),
552        }
553    }
554
555    pub fn data_mut(&mut self) -> &mut [u8] {
556        match self {
557            CacheIndex::U16 { data } => data.as_mut_slice(),
558            CacheIndex::U24 { data } => data.as_mut_slice(),
559            CacheIndex::U40 { data } => data.as_mut_slice(),
560            CacheIndex::U64 { data } => data.as_mut_slice(),
561        }
562    }
563
564    pub fn write<W>(self, mut write: W) -> io::Result<()>
565    where
566        W: io::Write,
567    {
568        write.write_all(&[self.marker().to_u8()])?;
569        write.write_all(self.data())?;
570        Ok(())
571    }
572
573    pub fn read<R>(mut read: R) -> io::Result<Self>
574    where
575        R: io::Read,
576    {
577        let mut marker = [0];
578        read.read_exact(&mut marker)?;
579        let marker = Marker::from_u8(marker[0]);
580
581        let mut r = match marker {
582            Marker::FixExt1 => CacheIndex::U16 {
583                data: Default::default(),
584            },
585            Marker::FixExt2 => CacheIndex::U24 {
586                data: Default::default(),
587            },
588            Marker::FixExt4 => CacheIndex::U40 {
589                data: Default::default(),
590            },
591            Marker::FixExt8 => CacheIndex::U64 {
592                data: Default::default(),
593            },
594            marker => return Err(UnexpectedMarker(marker).into()),
595        };
596
597        read.read_exact(r.data_mut())?;
598
599        Ok(r)
600    }
601}