1use std::{
37 borrow::Cow,
38 fs::File,
39 hint,
40 io::Write,
41 path::Path,
42 sync::{
43 atomic::{AtomicPtr, AtomicU64, Ordering},
44 Arc,
45 },
46 time::Instant,
47};
48
49use tracing::{
50 span::{Attributes, Id},
51 Subscriber,
52};
53use tracing_subscriber::registry::LookupSpan;
54use tracing_tape::{
55 intro::Intro,
56 record::{
57 field_type, parent_kind, CallsiteFieldRecord, CallsiteRecord, EventRecord,
58 EventValueRecord, SpanCloseRecord, SpanEnterRecord, SpanExitRecord, SpanFollowsRecord,
59 SpanOpenRecord2, SpanValueRecord,
60 },
61};
62use zerocopy::AsBytes;
63
64#[derive(Debug)]
65struct Chapter {
66 chapter_size: usize,
67 chapter_index: AtomicU64,
68 data_offset: AtomicU64,
69 bytes_written: AtomicU64,
70 buffer: AtomicPtr<u8>,
71}
72
73impl Chapter {
74 fn new(chapter_size: usize, chapter_index: u64) -> Self {
75 let buffer = unsafe {
76 std::alloc::alloc(std::alloc::Layout::from_size_align(chapter_size, 16).unwrap())
77 };
78 Self {
79 chapter_size,
80 chapter_index: AtomicU64::new(chapter_index),
81 buffer: AtomicPtr::new(buffer),
82 bytes_written: AtomicU64::new(0),
83 data_offset: AtomicU64::new(0),
84 }
87 }
88
89 unsafe fn as_bytes(&self) -> &[u8] {
90 std::slice::from_raw_parts(self.buffer.load(Ordering::Relaxed), self.chapter_size)
91 }
92
93 unsafe fn byte_range_mut(&self, offset: usize, len: usize) -> &mut [u8] {
94 std::slice::from_raw_parts_mut(self.buffer.load(Ordering::Relaxed).add(offset), len)
95 }
96
97 #[cold]
98 fn finish(&self, file: &File, end_offset: u64, next_chapter_index: u64) {
99 unsafe {
100 self.byte_range_mut(end_offset as usize, self.chapter_size - end_offset as usize)
101 }
102 .fill(0);
103 let data_offset = self.data_offset.load(Ordering::Relaxed);
105 let expected_bytes_written = end_offset - data_offset;
106
107 loop {
108 let bytes_written = self.bytes_written.load(Ordering::Acquire);
111 if bytes_written == expected_bytes_written {
112 break;
113 }
114 println!("waiting for written bytes {bytes_written} {expected_bytes_written}");
115 }
116
117 let offset = self.offset();
118 let data = unsafe { self.as_bytes() };
119
120 #[cfg(unix)]
121 {
122 use std::os::unix::fs::FileExt;
123 file.write_all_at(data, offset).unwrap();
124 }
125 #[cfg(windows)]
126 {
127 use std::os::windows::fs::FileExt;
128 let mut offset = offset;
129 let mut data = data;
130 while !data.is_empty() {
131 let bytes_written = file.seek_write(data, offset).unwrap();
132 data = &data[bytes_written..];
133 offset += bytes_written as u64;
134 }
135 }
136
137 self.bytes_written.store(0, Ordering::Relaxed);
138 self.data_offset.store(u64::max_value(), Ordering::Relaxed);
139 self.chapter_index
140 .store(next_chapter_index, Ordering::Release);
141 }
142
143 fn offset(&self) -> u64 {
144 INTRO_SIZE as u64 + self.chapter_index.load(Ordering::Relaxed) * self.chapter_size as u64
145 }
146}
147
148impl Drop for Chapter {
149 fn drop(&mut self) {
150 unsafe {
151 std::alloc::dealloc(
152 self.buffer.load(Ordering::Relaxed),
153 std::alloc::Layout::from_size_align(self.chapter_size, 16).unwrap(),
154 );
155 }
156 }
157}
158
159#[derive(Debug)]
160struct TapeRecorderInner {
161 file: File,
162 offset: AtomicU64,
163 init_instant: Instant,
164
165 chapter_size: u32,
166 chapter_size_pot: u8,
167 chapter_offset_mask: u64,
168 chapters: [Chapter; 2],
169 random_state: ahash::RandomState,
170}
171
172impl Drop for TapeRecorderInner {
173 fn drop(&mut self) {
174 let offset = self.offset.load(Ordering::Relaxed);
175 let chapter_bytes = offset & self.chapter_offset_mask;
176 let chapter_index = self.chapter_index(offset);
177 let chapter = self.chapter(chapter_index);
178 chapter.finish(&self.file, chapter_bytes, chapter_index + 2);
179 }
180}
181
182impl TapeRecorderInner {
183 #[inline]
184 fn elapsed_nanos(&self) -> i64 {
185 self.init_instant.elapsed().as_nanos() as i64
186 }
187
188 #[inline]
189 fn chapter_index(&self, offset: u64) -> u64 {
190 offset >> self.chapter_size_pot
191 }
192
193 #[inline]
194 fn chapter(&self, chapter_index: u64) -> &Chapter {
195 let chapter = &self.chapters[(chapter_index & 1) as usize];
196 while chapter.chapter_index.load(Ordering::Acquire) != chapter_index {
197 println!("waiting for {chapter_index}");
198 hint::spin_loop();
199 }
200 chapter
201 }
202
203 #[inline]
204 fn write<F: Fn(&mut [u8])>(&self, size: usize, f: F) {
205 if size > self.chapter_size as usize >> 2 {
206 panic!("record too large");
207 }
208
209 let data_start = self.offset.fetch_add(size as u64, Ordering::Relaxed);
210 let data_end = data_start + size as u64;
211
212 let data_start_chapter = self.chapter_index(data_start);
213 let data_end_chapter = self.chapter_index(data_end - 1);
214 let chapter = self.chapter(data_start_chapter);
215 let chapter_offset = data_start & self.chapter_offset_mask;
216
217 if data_start_chapter == data_end_chapter {
218 f(unsafe { chapter.byte_range_mut(chapter_offset as usize, size) });
219
220 chapter
221 .bytes_written
222 .fetch_add(size as u64, Ordering::Release);
223
224 if data_end & self.chapter_offset_mask == 0 {
225 chapter.finish(&self.file, self.chapter_size as u64, data_start_chapter + 2);
226
227 let next_chapter = self.chapter(data_start_chapter + 1);
228 next_chapter.data_offset.store(0, Ordering::Relaxed);
229 }
230 } else {
231 chapter.finish(&self.file, chapter_offset, data_start_chapter + 2);
232 let next_chapter = self.chapter(data_start_chapter + 1);
233 let next_chapter_offset = data_end & self.chapter_offset_mask;
234 next_chapter
235 .data_offset
236 .store(next_chapter_offset, Ordering::Relaxed);
237 unsafe { next_chapter.byte_range_mut(0, next_chapter_offset as usize) }.fill(0);
238 self.write(size, f);
239 }
240 }
241}
242
243#[derive(Debug, Clone)]
244pub struct TapeRecorder {
245 inner: Arc<TapeRecorderInner>,
246}
247
248const INTRO_SIZE: usize = std::mem::size_of::<Intro>();
249
250impl Default for TapeRecorder {
251 fn default() -> Self {
252 let exe = std::env::current_exe().ok();
253 let name = exe
254 .as_ref()
255 .and_then(|path| path.file_name())
256 .map(|name| name.to_string_lossy())
257 .unwrap_or(Cow::Borrowed("trace"));
258
259 let time = time::OffsetDateTime::now_local()
260 .ok()
261 .unwrap_or_else(time::OffsetDateTime::now_utc);
262 let format = time::macros::format_description!(
263 "[year]-[month]-[day]_[weekday repr:short]_[hour]-[minute]-[second]"
264 );
265
266 let time_format = time
267 .format(&format)
268 .ok()
269 .unwrap_or_else(|| time.unix_timestamp().to_string());
270
271 let file_path = format!("{}_{}.tape", name, time_format);
272 return Self::with_file(file_path).unwrap();
273 }
274}
275
276impl TapeRecorder {
277 fn with_file<P: AsRef<Path>>(path: P) -> std::io::Result<Self> {
278 let mut file = File::create_new(path)?;
279
280 let now_system = time::OffsetDateTime::now_local()
281 .ok()
282 .unwrap_or_else(time::OffsetDateTime::now_utc);
283 let now_instant = Instant::now();
284
285 let chapter_size: u32 = 1024 * 1024;
286 let chapter_size_pot = chapter_size.ilog2() as u8;
287
288 let intro = Intro::new(chapter_size_pot, now_system.unix_timestamp_nanos());
289 file.write_all(intro.as_bytes())?;
290
291 Ok(Self {
292 inner: Arc::new(TapeRecorderInner {
293 file,
294 offset: AtomicU64::new(0),
295 init_instant: now_instant,
296
297 chapter_size,
298 chapter_size_pot: chapter_size.ilog2() as u8,
299 chapter_offset_mask: chapter_size as u64 - 1,
300 chapters: [
301 Chapter::new(chapter_size as usize, 0),
302 Chapter::new(chapter_size as usize, 1),
303 ],
304 random_state: Default::default(),
305 }),
306 })
307 }
308}
309
310struct EventValueRecorder<'a> {
311 recorder: &'a TapeRecorderInner,
312 thread_id: u64,
313}
314
315impl EventValueRecorder<'_> {
316 fn record_value(&self, field: &tracing::field::Field, kind: u8, value: &[u8]) {
317 let field_id = self.recorder.random_state.hash_one(field.name());
318 let record = EventValueRecord::new(field_id, kind, value.len(), self.thread_id);
319 self.recorder
320 .write(std::mem::size_of_val(&record) + value.len(), |slice| {
321 let mut cursor = std::io::Cursor::new(slice);
322 cursor.write_all(record.as_bytes()).unwrap();
323 cursor.write_all(value).unwrap();
324 });
325 }
326}
327
328impl tracing::field::Visit for EventValueRecorder<'_> {
329 fn record_debug(&mut self, field: &tracing::field::Field, value: &dyn std::fmt::Debug) {
330 let string = format!("{value:?}");
331 self.record_value(field, field_type::STR, string.as_bytes());
332 }
333
334 fn record_f64(&mut self, field: &tracing::field::Field, value: f64) {
335 self.record_value(field, field_type::F64, &value.to_le_bytes());
336 }
337
338 fn record_i64(&mut self, field: &tracing::field::Field, value: i64) {
339 self.record_value(field, field_type::I64, &value.to_le_bytes());
340 }
341
342 fn record_u64(&mut self, field: &tracing::field::Field, value: u64) {
343 self.record_value(field, field_type::U64, &value.to_le_bytes());
344 }
345
346 fn record_i128(&mut self, field: &tracing::field::Field, value: i128) {
347 self.record_value(field, field_type::I128, &value.to_le_bytes());
348 }
349
350 fn record_u128(&mut self, field: &tracing::field::Field, value: u128) {
351 self.record_value(field, field_type::U128, &value.to_le_bytes());
352 }
353
354 fn record_bool(&mut self, field: &tracing::field::Field, value: bool) {
355 self.record_value(field, field_type::BOOL, &[value as u8]);
356 }
357
358 fn record_str(&mut self, field: &tracing::field::Field, value: &str) {
359 self.record_value(field, field_type::STR, value.as_bytes());
360 }
361
362 fn record_error(
363 &mut self,
364 field: &tracing::field::Field,
365 value: &(dyn std::error::Error + 'static),
366 ) {
367 let string = value.to_string();
368 self.record_value(field, field_type::ERROR, string.as_bytes());
369 }
370}
371
372struct SpanValueRecorder<'a> {
373 recorder: &'a TapeRecorderInner,
374 span_id: u64,
375}
376
377impl SpanValueRecorder<'_> {
378 fn record_value(&self, field: &tracing::field::Field, kind: u8, value: &[u8]) {
379 let field_id = self.recorder.random_state.hash_one(field.name());
380 let record = SpanValueRecord::new(field_id, kind, value.len(), self.span_id);
381 self.recorder
382 .write(std::mem::size_of_val(&record) + value.len(), |slice| {
383 let mut cursor = std::io::Cursor::new(slice);
384 cursor.write_all(record.as_bytes()).unwrap();
385 cursor.write_all(value).unwrap();
386 });
387 }
388}
389
390impl tracing::field::Visit for SpanValueRecorder<'_> {
391 fn record_debug(&mut self, field: &tracing::field::Field, value: &dyn std::fmt::Debug) {
392 let string = format!("{value:?}");
393 self.record_value(field, field_type::STR, string.as_bytes());
394 }
395
396 fn record_f64(&mut self, field: &tracing::field::Field, value: f64) {
397 self.record_value(field, field_type::F64, &value.to_le_bytes());
398 }
399
400 fn record_i64(&mut self, field: &tracing::field::Field, value: i64) {
401 self.record_value(field, field_type::I64, &value.to_le_bytes());
402 }
403
404 fn record_u64(&mut self, field: &tracing::field::Field, value: u64) {
405 self.record_value(field, field_type::U64, &value.to_le_bytes());
406 }
407
408 fn record_i128(&mut self, field: &tracing::field::Field, value: i128) {
409 self.record_value(field, field_type::I128, &value.to_le_bytes());
410 }
411
412 fn record_u128(&mut self, field: &tracing::field::Field, value: u128) {
413 self.record_value(field, field_type::U128, &value.to_le_bytes());
414 }
415
416 fn record_bool(&mut self, field: &tracing::field::Field, value: bool) {
417 self.record_value(field, field_type::BOOL, &[value as u8]);
418 }
419
420 fn record_str(&mut self, field: &tracing::field::Field, value: &str) {
421 self.record_value(field, field_type::STR, value.as_bytes());
422 }
423
424 fn record_error(
425 &mut self,
426 field: &tracing::field::Field,
427 value: &(dyn std::error::Error + 'static),
428 ) {
429 let string = value.to_string();
430 self.record_value(field, field_type::ERROR, string.as_bytes());
431 }
432}
433
434impl<S> tracing_subscriber::Layer<S> for TapeRecorder
435where
436 S: Subscriber + for<'a> LookupSpan<'a>,
437{
438 fn register_callsite(
439 &self,
440 metadata: &'static tracing::Metadata<'static>,
441 ) -> tracing::subscriber::Interest {
442 let id = self.inner.random_state.hash_one(metadata.callsite());
443
444 let module_path = metadata.module_path().unwrap_or("");
445 let file = metadata.file().unwrap_or("");
446
447 let name_len = metadata.name().len();
448 let target_len = metadata.target().len();
449 let module_path_len = module_path.len();
450 let file_len = file.len();
451
452 let record_len = std::mem::size_of::<CallsiteRecord>()
453 + name_len
454 + target_len
455 + module_path_len
456 + file_len;
457
458 let callsite_record = CallsiteRecord::new(
459 record_len as u16,
460 if metadata.is_span() {
461 tracing::metadata::Kind::SPAN
462 } else {
463 tracing::metadata::Kind::EVENT
464 },
465 *metadata.level(),
466 metadata.fields().len() as u16,
467 name_len as u16,
468 target_len as u16,
469 module_path_len as u16,
470 file_len as u16,
471 metadata.line().unwrap_or(0),
472 id,
473 );
474
475 self.inner.write(record_len, |slice| {
476 let mut cursor = std::io::Cursor::new(slice);
477 cursor.write_all(callsite_record.as_bytes()).unwrap();
478 cursor.write_all(metadata.name().as_bytes()).unwrap();
479 cursor.write_all(metadata.target().as_bytes()).unwrap();
480 cursor.write_all(module_path.as_bytes()).unwrap();
481 cursor.write_all(file.as_bytes()).unwrap();
482 });
483
484 for field in metadata.fields() {
485 let field_record = CallsiteFieldRecord::new(
486 field.name().len() as u16,
487 id,
488 self.inner.random_state.hash_one(field.name()),
489 );
490 self.inner
491 .write(field_record.header.len.get() as usize, |slice| {
492 let mut cursor = std::io::Cursor::new(slice);
493 cursor.write_all(field_record.as_bytes()).unwrap();
494 cursor.write_all(field.name().as_bytes()).unwrap();
495 });
496 }
497
498 tracing::subscriber::Interest::sometimes()
499 }
500
501 fn on_event(
502 &self,
503 event: &tracing::Event<'_>,
504 _ctx: tracing_subscriber::layer::Context<'_, S>,
505 ) {
506 let timestamp = self.inner.elapsed_nanos();
507 let callsite_id = self
508 .inner
509 .random_state
510 .hash_one(event.metadata().callsite());
511 let thread_id = self
512 .inner
513 .random_state
514 .hash_one(std::thread::current().id());
515 let event_record = EventRecord::new(
516 event.metadata().fields().len() as u16,
517 timestamp,
518 callsite_id,
519 thread_id,
520 );
521
522 self.inner
523 .write(std::mem::size_of::<EventRecord>(), |slice| {
524 slice.copy_from_slice(event_record.as_bytes());
525 });
526 let mut recorder = EventValueRecorder {
527 recorder: &self.inner,
528 thread_id,
529 };
530 event.record(&mut recorder);
531 }
532
533 fn on_new_span(
534 &self,
535 attrs: &Attributes<'_>,
536 id: &Id,
537 _ctx: tracing_subscriber::layer::Context<'_, S>,
538 ) {
539 let timestamp = self.inner.elapsed_nanos();
540 let id = self.inner.random_state.hash_one(id);
541 let callsite_id = self
542 .inner
543 .random_state
544 .hash_one(attrs.metadata().callsite());
545 let (parent_kind, parent_id) = if let Some(parent) = attrs.parent() {
546 (
547 parent_kind::EXPLICIT,
548 self.inner.random_state.hash_one(parent),
549 )
550 } else if attrs.is_contextual() {
551 (parent_kind::CURRENT, 0)
552 } else {
553 (parent_kind::ROOT, 0)
554 };
555 let record = SpanOpenRecord2::new(id, parent_kind, parent_id, callsite_id, timestamp);
556 self.inner.write(std::mem::size_of_val(&record), |slice| {
557 slice.copy_from_slice(record.as_bytes());
558 });
559 let mut recorder = SpanValueRecorder {
560 recorder: &self.inner,
561 span_id: id,
562 };
563 attrs.record(&mut recorder);
564 }
565
566 fn on_enter(&self, id: &Id, _ctx: tracing_subscriber::layer::Context<'_, S>) {
567 let timestamp = self.inner.elapsed_nanos();
568 let id = self.inner.random_state.hash_one(id);
569 let thread_id = self
570 .inner
571 .random_state
572 .hash_one(std::thread::current().id());
573
574 let record = SpanEnterRecord::new(id, timestamp, thread_id);
575 self.inner.write(std::mem::size_of_val(&record), |slice| {
576 slice.copy_from_slice(record.as_bytes());
577 });
578 }
579
580 fn on_exit(&self, id: &Id, _ctx: tracing_subscriber::layer::Context<'_, S>) {
581 let timestamp = self.inner.elapsed_nanos();
582 let id = self.inner.random_state.hash_one(id);
583
584 let record = SpanExitRecord::new(id, timestamp);
585 self.inner.write(std::mem::size_of_val(&record), |slice| {
586 slice.copy_from_slice(record.as_bytes());
587 });
588 }
589
590 fn on_close(&self, id: Id, _ctx: tracing_subscriber::layer::Context<'_, S>) {
591 let timestamp = self.inner.elapsed_nanos();
592 let id = self.inner.random_state.hash_one(id);
593
594 let record = SpanCloseRecord::new(id, timestamp);
595 self.inner.write(std::mem::size_of_val(&record), |slice| {
596 slice.copy_from_slice(record.as_bytes());
597 });
598 }
599
600 fn on_record(
601 &self,
602 id: &Id,
603 values: &tracing::span::Record<'_>,
604 _ctx: tracing_subscriber::layer::Context<'_, S>,
605 ) {
606 let id = self.inner.random_state.hash_one(id);
607 let mut recorder = SpanValueRecorder {
608 recorder: &self.inner,
609 span_id: id,
610 };
611 values.record(&mut recorder);
612 }
613
614 fn on_follows_from(
615 &self,
616 id: &Id,
617 follows: &Id,
618 _ctx: tracing_subscriber::layer::Context<'_, S>,
619 ) {
620 let id = self.inner.random_state.hash_one(id);
621 let follows = self.inner.random_state.hash_one(follows);
622
623 let record = SpanFollowsRecord::new(id, follows);
624 self.inner.write(std::mem::size_of_val(&record), |slice| {
625 slice.copy_from_slice(record.as_bytes());
626 });
627 }
628}