1use crate::{
2 string_cache::{CacheInstruction, CacheInstructionSet, CacheString},
3 tape::{
4 FieldValue, Instruction, InstructionId, InstructionSet, InstructionTrait, TapeMachine,
5 Value,
6 },
7};
8use chrono::DateTime;
9use rmp::{Marker, decode, encode};
10use std::{
11 io::{self, BufRead, BufReader, Read},
12 num::NonZeroU64,
13};
14use tracing::Level;
15
16pub struct Store<W>(W);
17impl<W> Store<W>
18where
19 W: io::Write + Send + 'static,
20{
21 pub fn new(out: W) -> Self {
22 Self(out)
23 }
24
25 pub fn do_handle(write: &mut W, instruction: Instruction) -> io::Result<()> {
26 let instruction = match instruction {
27 Instruction::Restart => CacheInstruction::Restart,
28 Instruction::NewSpan { parent, span, name } => {
29 let name = CacheString::Present(name);
30 CacheInstruction::NewSpan { parent, span, name }
31 }
32 Instruction::FinishedSpan => CacheInstruction::FinishedSpan,
33 Instruction::NewRecord(span) => CacheInstruction::NewRecord(span),
34 Instruction::FinishedRecord => CacheInstruction::FinishedRecord,
35 Instruction::StartEvent {
36 time,
37 span,
38 target,
39 priority,
40 } => {
41 let target = CacheString::Present(target);
42 CacheInstruction::StartEvent {
43 time,
44 span,
45 target,
46 priority,
47 }
48 }
49 Instruction::FinishedEvent => CacheInstruction::FinishedEvent,
50 Instruction::AddValue(FieldValue { name, value }) => {
51 let name = CacheString::Present(name);
52 let value = match value {
53 Value::Debug(str) => Value::String(CacheString::Present(str)),
54 Value::String(str) => Value::String(CacheString::Present(str)),
55 Value::Float(data) => Value::Float(data),
56 Value::Integer(data) => Value::Integer(data),
57 Value::Unsigned(data) => Value::Unsigned(data),
58 Value::Bool(data) => Value::Bool(data),
59 Value::ByteArray(items) => Value::ByteArray(items),
60 };
61
62 CacheInstruction::AddValue(FieldValue { name, value })
63 }
64 Instruction::DeleteSpan(span) => CacheInstruction::DeleteSpan(span),
65 };
66
67 Self::do_handle_cached(write, instruction)
68 }
69
70 pub fn do_handle_cached(write: &mut W, instruction: CacheInstruction) -> io::Result<()> {
71 write.write_all(&[instruction.id().into()])?;
72 match instruction {
73 CacheInstruction::Restart => (),
74 CacheInstruction::NewString(data) => encode::write_str(write, data)?,
75 CacheInstruction::NewSpan { parent, span, name } => {
76 let parent = parent.map(Into::into).unwrap_or(0);
77 let span = span.into();
78 encode::write_uint(write, parent)?;
79 encode::write_uint(write, span)?;
80 Self::write_cache_str(write, name)?;
81 }
82 CacheInstruction::FinishedSpan => (),
83 CacheInstruction::NewRecord(span) => {
84 let span: u64 = span.into();
85 encode::write_uint(write, span)?;
86 }
87 CacheInstruction::FinishedRecord => (),
88 CacheInstruction::StartEvent {
89 time,
90 span,
91 target,
92 priority,
93 } => {
94 let time2 = time.timestamp_subsec_nanos();
95 let time = time.timestamp() as u64;
96 let span = span.map(Into::into).unwrap_or(0);
97 let priority = priority_num(priority);
98
99 encode::write_uint(write, time)?;
100 encode::write_uint(write, time2 as u64)?;
101 encode::write_uint(write, span)?;
102 Self::write_cache_str(write, target)?;
103 encode::write_uint(write, priority)?;
104 }
105 CacheInstruction::FinishedEvent => (),
106 CacheInstruction::AddValue(field_value) => {
107 Self::write_cache_str(write, field_value.name)?;
108 Self::write_cache_value(write, field_value.value)?;
109 }
110 CacheInstruction::DeleteSpan(span) => {
111 let span = span.into();
112 encode::write_uint(write, span)?;
113 }
114 }
115 write.flush()?;
116
117 Ok(())
118 }
119
120 fn write_cache_str(write: &mut W, str: CacheString) -> io::Result<()> {
121 match str {
122 CacheString::Present(data) => encode::write_str(write, data)?,
123 CacheString::Cached(index) => {
124 CacheIndex::from(index).write(write)?;
125 }
126 }
127
128 Ok(())
129 }
130
131 fn write_cache_value(write: &mut W, value: Value<CacheString>) -> io::Result<()> {
132 match value {
133 Value::Debug(str) => {
134 encode::write_array_len(write, 1)?;
135 Self::write_cache_str(write, str)?;
136 }
137 Value::String(str) => Self::write_cache_str(write, str)?,
138 Value::Float(data) => encode::write_f64(write, data)?,
139 Value::Integer(data) => {
140 encode::write_sint(write, data)?;
141 }
142 Value::Unsigned(data) => {
143 encode::write_uint(write, data)?;
144 }
145 Value::Bool(data) => encode::write_bool(write, data)?,
146 Value::ByteArray(data) => encode::write_bin(write, data)?,
147 }
148
149 Ok(())
150 }
151}
152impl<W> TapeMachine<CacheInstructionSet> for Store<W>
153where
154 W: io::Write + Send + 'static,
155{
156 fn needs_restart(&mut self) -> bool {
157 false
158 }
159
160 fn handle(&mut self, instruction: CacheInstruction) {
161 let _ = Self::do_handle_cached(&mut self.0, instruction);
162 }
163}
164impl<W> TapeMachine<InstructionSet> for Store<W>
165where
166 W: io::Write + Send + 'static,
167{
168 fn needs_restart(&mut self) -> bool {
169 false
170 }
171
172 fn handle(&mut self, instruction: Instruction) {
173 let _ = Self::do_handle(&mut self.0, instruction);
174 }
175}
176
177pub struct Load<R> {
178 read: BufReader<R>,
179 buf1: Vec<u8>,
180 buf2: Vec<u8>,
181 started: bool,
182}
183impl<R> Load<R>
184where
185 R: io::Read,
186{
187 pub fn new(input: R) -> Self {
188 Self {
189 read: BufReader::new(input),
190 buf1: Default::default(),
191 buf2: Default::default(),
192 started: false,
193 }
194 }
195
196 pub fn restart(&mut self) {
197 self.started = false;
198 }
199
200 pub fn forward<T>(&mut self, machine: &mut T) -> io::Result<()>
201 where
202 T: TapeMachine<InstructionSet>,
203 {
204 while let Some(instruction) = self.fetch_one()? {
205 machine.handle(instruction);
206 }
207
208 Ok(())
209 }
210
211 pub fn forward_cached<T>(&mut self, machine: &mut T) -> io::Result<()>
212 where
213 T: TapeMachine<CacheInstructionSet>,
214 {
215 while let Some(instruction) = self.fetch_one_cached()? {
216 machine.handle(instruction);
217 }
218
219 Ok(())
220 }
221
222 pub fn fetch_one(&mut self) -> io::Result<Option<Instruction>> {
223 let Some(instruction) = self.fetch_one_cached()? else {
224 return Ok(None);
225 };
226
227 Ok(Some(match instruction {
228 CacheInstruction::Restart => Instruction::Restart,
229 CacheInstruction::NewString(_) => return Err(UnexpectedCached.into()),
230 CacheInstruction::NewSpan { parent, span, name } => {
231 let name = match name {
232 CacheString::Present(str) => str,
233 CacheString::Cached(_) => return Err(UnexpectedCached.into()),
234 };
235
236 Instruction::NewSpan { parent, span, name }
237 }
238 CacheInstruction::FinishedSpan => Instruction::FinishedSpan,
239 CacheInstruction::NewRecord(span) => Instruction::NewRecord(span),
240 CacheInstruction::FinishedRecord => Instruction::FinishedRecord,
241 CacheInstruction::StartEvent {
242 time,
243 span,
244 target,
245 priority,
246 } => {
247 let target = match target {
248 CacheString::Present(str) => str,
249 CacheString::Cached(_) => return Err(UnexpectedCached.into()),
250 };
251
252 Instruction::StartEvent {
253 time,
254 span,
255 target,
256 priority,
257 }
258 }
259 CacheInstruction::FinishedEvent => Instruction::FinishedEvent,
260 CacheInstruction::AddValue(FieldValue { name, value }) => {
261 let name = match name {
262 CacheString::Present(str) => str,
263 CacheString::Cached(_) => return Err(UnexpectedCached.into()),
264 };
265 let value = match value {
266 Value::Debug(CacheString::Present(str)) => Value::Debug(str),
267 Value::Debug(CacheString::Cached(_)) => return Err(UnexpectedCached.into()),
268 Value::String(CacheString::Present(str)) => Value::String(str),
269 Value::String(CacheString::Cached(_)) => return Err(UnexpectedCached.into()),
270 Value::Float(value) => Value::Float(value),
271 Value::Integer(value) => Value::Integer(value),
272 Value::Unsigned(value) => Value::Unsigned(value),
273 Value::Bool(value) => Value::Bool(value),
274 Value::ByteArray(items) => Value::ByteArray(items),
275 };
276
277 Instruction::AddValue(FieldValue { name, value })
278 }
279 CacheInstruction::DeleteSpan(span) => Instruction::DeleteSpan(span),
280 }))
281 }
282
283 pub fn fetch_one_cached(&mut self) -> io::Result<Option<CacheInstruction>> {
284 let instruction = loop {
285 let Some(instruction) = self.read.fill_buf()?.first().copied() else {
286 return Ok(None);
287 };
288 self.read.consume(1);
289
290 if self.started {
291 break instruction;
292 }
293
294 if instruction == u8::from(InstructionId::Restart) {
295 self.started = true;
296 break instruction;
297 }
298 };
299
300 let instruction = InstructionId::try_from(instruction).map_err(|e| {
301 io::Error::new(io::ErrorKind::InvalidData, format!("bad instruction {e}"))
302 })?;
303
304 Ok(Some(match instruction {
305 InstructionId::Restart => CacheInstruction::Restart,
306 InstructionId::NewString => CacheInstruction::NewString(self.read_str()?),
307 InstructionId::NewSpan => {
308 let parent: u64 = decode::read_int(&mut self.read).map_err(decode_err)?;
309 let span: u64 = decode::read_int(&mut self.read).map_err(decode_err)?;
310 let name = self.read_cache_str()?;
311
312 CacheInstruction::NewSpan {
313 parent: NonZeroU64::new(parent),
314 span: NonZeroU64::new(span).ok_or(ZeroSpan)?,
315 name,
316 }
317 }
318 InstructionId::FinishedSpan => CacheInstruction::FinishedSpan,
319 InstructionId::NewRecord => {
320 let span = decode::read_int(&mut self.read).map_err(decode_err)?;
321
322 CacheInstruction::NewRecord(NonZeroU64::new(span).ok_or(ZeroSpan)?)
323 }
324 InstructionId::FinishedRecord => CacheInstruction::FinishedRecord,
325 InstructionId::StartEvent => {
326 let time: u64 = decode::read_int(&mut self.read).map_err(decode_err)?;
327 let time2: u64 = decode::read_int(&mut self.read).map_err(decode_err)?;
328 let span = decode::read_int(&mut self.read).map_err(decode_err)?;
329 let target = Self::do_read_cache_str(&mut self.read, &mut self.buf1)?;
330 let priority = num_priority(decode::read_int(&mut self.read).map_err(decode_err)?);
331
332 CacheInstruction::StartEvent {
333 time: DateTime::from_timestamp(time as i64, time2 as u32).unwrap_or_default(),
334 span: NonZeroU64::new(span),
335 target,
336 priority,
337 }
338 }
339 InstructionId::FinishedEvent => CacheInstruction::FinishedEvent,
340 InstructionId::AddValue => {
341 let name = Self::do_read_cache_str(&mut self.read, &mut self.buf1)?;
342 let value = Self::do_read_value(&mut self.read, &mut self.buf2)?;
343
344 CacheInstruction::AddValue(FieldValue { name, value })
345 }
346 InstructionId::DeleteSpan => {
347 let span: u64 = decode::read_int(&mut self.read).map_err(decode_err)?;
348 CacheInstruction::DeleteSpan(NonZeroU64::new(span).ok_or(ZeroSpan)?)
349 }
350 }))
351 }
352
353 fn read_str(&mut self) -> io::Result<&str> {
354 Self::do_read_str(&mut self.read, &mut self.buf1)
355 }
356
357 fn do_read_str<'a>(read: &mut BufReader<R>, buf: &'a mut Vec<u8>) -> io::Result<&'a str> {
358 let len = decode::read_str_len(read).map_err(decode_err)?;
359 buf.resize(len as usize, 0);
360 read.read_exact(buf.as_mut_slice())?;
361
362 std::str::from_utf8(buf.as_slice()).map_err(decode_err)
363 }
364
365 fn do_read_value<'a>(
366 read: &mut BufReader<R>,
367 buf: &'a mut Vec<u8>,
368 ) -> io::Result<Value<'a, CacheString<'a>>> {
369 Ok(match Self::do_peek_marker(read)? {
370 Marker::FixArray(1) => {
371 read.consume(1);
372 Value::Debug(Self::do_read_cache_str(read, buf)?)
373 }
374 Marker::FixPos(_)
375 | Marker::FixNeg(_)
376 | Marker::I8
377 | Marker::I16
378 | Marker::I32
379 | Marker::I64 => Value::Integer(decode::read_int(read).map_err(decode_err)?),
380 Marker::FixStr(_)
381 | Marker::Str8
382 | Marker::Str16
383 | Marker::Str32
384 | Marker::FixExt1
385 | Marker::FixExt2
386 | Marker::FixExt4
387 | Marker::FixExt8 => Value::String(Self::do_read_cache_str(read, buf)?),
388 Marker::False => Value::Bool(false),
389 Marker::True => Value::Bool(true),
390 Marker::Bin8 | Marker::Bin16 | Marker::Bin32 => {
391 let n = decode::read_bin_len(read).map_err(decode_err)?;
392 buf.resize(n as usize, 0);
393 read.read_exact(buf)?;
394 Value::ByteArray(buf)
395 }
396 Marker::F32 => Value::Float(decode::read_f32(read).map_err(decode_err)? as f64),
397 Marker::F64 => Value::Float(decode::read_f64(read).map_err(decode_err)?),
398 Marker::U8 | Marker::U16 | Marker::U32 | Marker::U64 => {
399 Value::Unsigned(decode::read_int(read).map_err(decode_err)?)
400 }
401 marker => return Err(UnexpectedMarker(marker).into()),
402 })
403 }
404
405 fn read_cache_str(&mut self) -> io::Result<CacheString> {
406 Self::do_read_cache_str(&mut self.read, &mut self.buf1)
407 }
408
409 fn do_read_cache_str<'a>(
410 read: &mut BufReader<R>,
411 buf: &'a mut Vec<u8>,
412 ) -> io::Result<CacheString<'a>> {
413 Ok(match Self::do_peek_marker(read)? {
414 Marker::FixStr(_) | Marker::Str8 | Marker::Str16 | Marker::Str32 => {
415 CacheString::Present(Self::do_read_str(read, buf)?)
416 }
417 Marker::FixExt1 | Marker::FixExt2 | Marker::FixExt4 | Marker::FixExt8 => {
418 CacheString::Cached(CacheIndex::read(read)?.into())
419 }
420 marker => return Err(UnexpectedMarker(marker).into()),
421 })
422 }
423
424 fn do_peek_marker(read: &mut BufReader<R>) -> io::Result<Marker> {
425 let marker = read.fill_buf()?.first().ok_or(EofOnMarker)?;
426
427 Ok(Marker::from_u8(*marker))
428 }
429}
430
431pub fn priority_num(level: Level) -> u64 {
432 match level {
433 Level::TRACE => 0,
434 Level::DEBUG => 1,
435 Level::INFO => 2,
436 Level::WARN => 3,
437 Level::ERROR => 4,
438 }
439}
440
441pub fn num_priority(num: u64) -> Level {
442 match num {
443 0 => Level::TRACE,
444 1 => Level::DEBUG,
445 2 => Level::INFO,
446 3 => Level::WARN,
447 4 => Level::ERROR,
448 _ => Level::ERROR,
449 }
450}
451
452fn decode_err<E: ToString>(error: E) -> io::Error {
453 io::Error::new(io::ErrorKind::InvalidInput, error.to_string())
454}
455
456#[derive(thiserror::Error, Debug)]
457#[error("Unexpected type {0:?}")]
458pub struct UnexpectedMarker(Marker);
459impl From<UnexpectedMarker> for io::Error {
460 fn from(value: UnexpectedMarker) -> Self {
461 decode_err(value)
462 }
463}
464
465#[derive(thiserror::Error, Debug)]
466#[error("Expecting Msgpack Marker, got EOF")]
467pub struct EofOnMarker;
468impl From<EofOnMarker> for io::Error {
469 fn from(value: EofOnMarker) -> Self {
470 decode_err(value)
471 }
472}
473
474#[derive(thiserror::Error, Debug)]
475#[error("Span should not have value of zero")]
476pub struct ZeroSpan;
477impl From<ZeroSpan> for io::Error {
478 fn from(value: ZeroSpan) -> Self {
479 decode_err(value)
480 }
481}
482
483#[derive(thiserror::Error, Debug)]
484#[error("Trying to load cached instruction file into uncached machine")]
485pub struct UnexpectedCached;
486impl From<UnexpectedCached> for io::Error {
487 fn from(value: UnexpectedCached) -> Self {
488 decode_err(value)
489 }
490}
491
492#[derive(Clone, Copy)]
493pub enum CacheIndex {
494 U16 { data: [u8; 2] },
495 U24 { data: [u8; 3] },
496 U40 { data: [u8; 5] },
497 U64 { data: [u8; 9] },
498}
499impl From<CacheIndex> for u64 {
500 fn from(value: CacheIndex) -> Self {
501 match value {
502 CacheIndex::U16 { data } => u64::from_le_bytes([data[0], data[1], 0, 0, 0, 0, 0, 0]),
503 CacheIndex::U24 { data } => {
504 u64::from_le_bytes([data[0], data[1], data[2], 0, 0, 0, 0, 0])
505 }
506 CacheIndex::U40 { data } => {
507 u64::from_le_bytes([data[0], data[1], data[2], data[3], data[4], 0, 0, 0])
508 }
509 CacheIndex::U64 { data } => u64::from_le_bytes([
510 data[1], data[2], data[3], data[4], data[5], data[6], data[7], data[8],
511 ]),
512 }
513 }
514}
515impl From<u64> for CacheIndex {
516 fn from(value: u64) -> Self {
517 let bytes = value.to_le_bytes();
518 match bytes {
519 [data0, data1, 0, 0, 0, 0, 0, 0] => CacheIndex::U16 {
520 data: [data0, data1],
521 },
522 [data0, data1, data2, 0, 0, 0, 0, 0] => CacheIndex::U24 {
523 data: [data0, data1, data2],
524 },
525 [data0, data1, data2, data3, data4, 0, 0, 0] => CacheIndex::U40 {
526 data: [data0, data1, data2, data3, data4],
527 },
528 data => CacheIndex::U64 {
529 data: [
530 0, data[0], data[1], data[2], data[3], data[4], data[5], data[6], data[7],
531 ],
532 },
533 }
534 }
535}
536impl CacheIndex {
537 pub fn marker(self) -> Marker {
538 match self {
539 CacheIndex::U16 { .. } => Marker::FixExt1,
540 CacheIndex::U24 { .. } => Marker::FixExt2,
541 CacheIndex::U40 { .. } => Marker::FixExt4,
542 CacheIndex::U64 { .. } => Marker::FixExt8,
543 }
544 }
545
546 pub fn data(&self) -> &[u8] {
547 match self {
548 CacheIndex::U16 { data } => data.as_slice(),
549 CacheIndex::U24 { data } => data.as_slice(),
550 CacheIndex::U40 { data } => data.as_slice(),
551 CacheIndex::U64 { data } => data.as_slice(),
552 }
553 }
554
555 pub fn data_mut(&mut self) -> &mut [u8] {
556 match self {
557 CacheIndex::U16 { data } => data.as_mut_slice(),
558 CacheIndex::U24 { data } => data.as_mut_slice(),
559 CacheIndex::U40 { data } => data.as_mut_slice(),
560 CacheIndex::U64 { data } => data.as_mut_slice(),
561 }
562 }
563
564 pub fn write<W>(self, mut write: W) -> io::Result<()>
565 where
566 W: io::Write,
567 {
568 write.write_all(&[self.marker().to_u8()])?;
569 write.write_all(self.data())?;
570 Ok(())
571 }
572
573 pub fn read<R>(mut read: R) -> io::Result<Self>
574 where
575 R: io::Read,
576 {
577 let mut marker = [0];
578 read.read_exact(&mut marker)?;
579 let marker = Marker::from_u8(marker[0]);
580
581 let mut r = match marker {
582 Marker::FixExt1 => CacheIndex::U16 {
583 data: Default::default(),
584 },
585 Marker::FixExt2 => CacheIndex::U24 {
586 data: Default::default(),
587 },
588 Marker::FixExt4 => CacheIndex::U40 {
589 data: Default::default(),
590 },
591 Marker::FixExt8 => CacheIndex::U64 {
592 data: Default::default(),
593 },
594 marker => return Err(UnexpectedMarker(marker).into()),
595 };
596
597 read.read_exact(r.data_mut())?;
598
599 Ok(r)
600 }
601}