dynamo_llm/
recorder.rs

1// SPDX-FileCopyrightText: Copyright (c) 2024-2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
2// SPDX-License-Identifier: Apache-2.0
3
4use serde::{Deserialize, Serialize};
5use std::io;
6use std::path::{Path, PathBuf};
7use std::sync::Arc;
8use std::time::{Duration, Instant};
9use tokio::fs::{self, File, OpenOptions};
10use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader, BufWriter};
11use tokio::sync::{Mutex, mpsc};
12use tokio_util::sync::CancellationToken;
13
14/// Record entry that will be serialized to JSONL
15#[derive(Serialize, Deserialize)]
16struct RecordEntry<T>
17where
18    T: Clone,
19{
20    timestamp: u64,
21    event: T,
22}
23
24/// A generic recorder for events that streams directly to a JSONL file
25#[derive(Debug)]
26pub struct Recorder<T> {
27    /// A sender for events that can be cloned and shared with producers
28    event_tx: mpsc::Sender<T>,
29    /// A cancellation token for managing shutdown
30    cancel: CancellationToken,
31    /// Counter for the number of events written
32    event_count: Arc<Mutex<usize>>,
33    /// Time when the first event was received
34    first_event_time: Arc<Mutex<Option<Instant>>>,
35}
36
37impl<T> Recorder<T>
38where
39    T: Serialize + for<'de> Deserialize<'de> + Clone + Send + Sync + 'static,
40{
41    /// Create a new Recorder that streams events directly to a JSONL file
42    ///
43    /// ### Arguments
44    ///
45    /// * `token` - A cancellation token for managing shutdown
46    /// * `output_path` - Path to the JSONL file to write events to
47    /// * `max_lines_per_file` - Maximum number of lines per file before rotating to a new file.
48    ///   If None, no rotation will occur.
49    /// * `max_count` - Maximum number of events to record before shutting down.
50    ///   If None, no limit will be applied.
51    /// * `max_time` - Maximum duration in seconds to record before shutting down.
52    ///   If None, no time limit will be applied.
53    ///
54    /// ### Returns
55    ///
56    /// A Result with a new Recorder that streams events to the specified file
57    pub async fn new<P: AsRef<Path>>(
58        token: CancellationToken,
59        output_path: P,
60        max_lines_per_file: Option<usize>,
61        max_count: Option<usize>,
62        max_time: Option<f64>,
63    ) -> io::Result<Self> {
64        let (event_tx, mut event_rx) = mpsc::channel::<T>(2048);
65        let event_count = Arc::new(Mutex::new(0));
66        let event_count_clone = event_count.clone();
67        let cancel_clone = token.clone();
68        let start_time = Instant::now();
69        let first_event_time = Arc::new(Mutex::new(None));
70        let first_event_time_clone = first_event_time.clone();
71
72        // Ensure the directory exists
73        if let Some(parent) = output_path.as_ref().parent()
74            && !parent.exists()
75        {
76            fs::create_dir_all(parent).await?;
77        }
78
79        // Create the file for writing
80        let file = OpenOptions::new()
81            .create(true)
82            .write(true)
83            .truncate(true)
84            .open(&output_path)
85            .await?;
86
87        let file_path = output_path.as_ref().to_path_buf();
88
89        // Spawn a task to receive events and write them to the file
90        tokio::spawn(async move {
91            let start_time = start_time;
92            let mut writer = BufWriter::with_capacity(32768, file);
93            let mut line_count = 0;
94            let mut file_index = 0;
95            let base_path = file_path.clone();
96
97            // Set up max time deadline if specified
98            let max_time_deadline = max_time.map(|secs| {
99                let duration = Duration::from_secs_f64(secs);
100                start_time + duration
101            });
102
103            loop {
104                // Check time limit if set
105                if let Some(deadline) = max_time_deadline
106                    && Instant::now() >= deadline
107                {
108                    tracing::info!("Recorder reached max time limit, shutting down");
109                    // Flush and cancel
110                    if let Err(e) = writer.flush().await {
111                        tracing::error!("Failed to flush on time limit shutdown: {}", e);
112                    }
113                    cancel_clone.cancel();
114                    return;
115                }
116
117                tokio::select! {
118                    biased;
119
120                    _ = cancel_clone.cancelled() => {
121                        // Flush any pending writes before shutting down
122                        if let Err(e) = writer.flush().await {
123                            tracing::error!("Failed to flush on shutdown: {}", e);
124                        }
125
126                        tracing::debug!("Recorder task shutting down");
127                        return;
128                    }
129
130                    Some(event) = event_rx.recv() => {
131                        // Update first_event_time if this is the first event
132                        {
133                            let mut first_time = first_event_time_clone.lock().await;
134                            if first_time.is_none() {
135                                *first_time = Some(Instant::now());
136                            }
137                        }
138
139                        // Calculate elapsed time in milliseconds
140                        let elapsed_ms = start_time.elapsed().as_millis() as u64;
141
142                        // Create the record entry
143                        let entry = RecordEntry {
144                            timestamp: elapsed_ms,
145                            event,
146                        };
147
148                        // Serialize to JSON string
149                        let json = match serde_json::to_string(&entry) {
150                            Ok(json) => json,
151                            Err(e) => {
152                                tracing::error!("Failed to serialize event: {}", e);
153                                continue;
154                            }
155                        };
156
157                        // Write JSON line
158                        if let Err(e) = writer.write_all(json.as_bytes()).await {
159                            tracing::error!("Failed to write event: {}", e);
160                            continue;
161                        }
162
163                        // Add a newline
164                        if let Err(e) = writer.write_all(b"\n").await {
165                            tracing::error!("Failed to write newline: {}", e);
166                            continue;
167                        }
168
169                        // Increment line count
170                        line_count += 1;
171
172                        // Check if we need to rotate to a new file
173                        if let Some(max_lines) = max_lines_per_file
174                            && line_count >= max_lines {
175                                // Flush the current file
176                                if let Err(e) = writer.flush().await {
177                                    tracing::error!("Failed to flush file before rotation: {}", e);
178                                }
179
180                                // Create new filename with suffix
181                                file_index += 1;
182                                let new_path = create_rotated_path(&base_path, file_index);
183
184                                // Open new file
185                                match OpenOptions::new()
186                                    .create(true)
187                                    .write(true)
188                                    .truncate(true)
189                                    .open(&new_path)
190                                    .await
191                                {
192                                    Ok(new_file) => {
193                                        writer = BufWriter::with_capacity(32768, new_file);
194                                        line_count = 0;
195                                        tracing::info!("Rotated to new file: {}", new_path.display());
196                                    },
197                                    Err(e) => {
198                                        tracing::error!("Failed to open rotated file {}: {}", new_path.display(), e);
199                                        // Continue with the existing file if rotation fails
200                                    }
201                                }
202                            }
203
204                        // Update event count
205                        let mut count = event_count_clone.lock().await;
206                        *count += 1;
207
208                        // Check if we've reached the maximum count
209                        if let Some(max) = max_count
210                            && *count >= max {
211                                tracing::info!("Recorder reached max event count ({}), shutting down", max);
212                                // Flush buffer before shutting down
213                                if let Err(e) = writer.flush().await {
214                                    tracing::error!("Failed to flush on count limit shutdown: {}", e);
215                                }
216                                // Drop the lock before cancelling
217                                drop(count);
218                                cancel_clone.cancel();
219                                return;
220                            }
221                    }
222                }
223            }
224        });
225
226        Ok(Self {
227            event_tx,
228            cancel: token,
229            event_count,
230            first_event_time,
231        })
232    }
233
234    /// Get a sender that can be used to send events to the recorder
235    pub fn event_sender(&self) -> mpsc::Sender<T> {
236        self.event_tx.clone()
237    }
238
239    /// Get the count of recorded events
240    pub async fn event_count(&self) -> usize {
241        *self.event_count.lock().await
242    }
243
244    /// Get the elapsed time since the first event was received
245    ///
246    /// Returns a Result with the elapsed time or an error if no events have been received yet
247    pub async fn elapsed_time(&self) -> io::Result<Duration> {
248        let first_time = self.first_event_time.lock().await;
249        match *first_time {
250            Some(time) => Ok(time.elapsed()),
251            None => Err(io::Error::other("No events received yet")),
252        }
253    }
254
255    /// Shutdown the recorder
256    pub fn shutdown(&self) {
257        self.cancel.cancel();
258    }
259
260    /// Send events from a JSONL file to the provided event sender
261    ///
262    /// ### Arguments
263    ///
264    /// * `filename` - Path to the JSONL file to read events from
265    /// * `event_tx` - A sender for events
266    /// * `timed` - If true, events will be sent according to their recorded timestamps.
267    ///   If false, events will be sent as fast as possible without delay.
268    /// * `max_count` - Maximum number of events to send before stopping. If None, all events will be sent.
269    /// * `max_time` - Maximum duration in seconds to send events before stopping. If None, no time limit.
270    ///
271    /// ### Returns
272    ///
273    /// A Result indicating success or failure with the number of events sent
274    pub async fn send_events<P: AsRef<Path>>(
275        filename: P,
276        event_tx: &mpsc::Sender<T>,
277        timed: bool,
278        max_count: Option<usize>,
279        max_time: Option<f64>,
280    ) -> io::Result<usize> {
281        // Store the display name before using filename
282        let display_name = filename.as_ref().display().to_string();
283
284        // Check if file exists
285        if !filename.as_ref().exists() {
286            return Err(io::Error::new(
287                io::ErrorKind::NotFound,
288                format!("File not found: {}", display_name),
289            ));
290        }
291
292        // Set up start time and deadline if max_time is specified
293        let start_time = Instant::now();
294        let deadline = max_time.map(|secs| start_time + Duration::from_secs_f64(secs));
295
296        // Open the file for reading using tokio's async file I/O
297        let file = File::open(&filename).await?;
298        let reader = BufReader::with_capacity(32768, file);
299        let mut lines = reader.lines();
300
301        let mut count = 0;
302        let mut line_number = 0;
303        let mut prev_timestamp: Option<u64> = None;
304
305        // Read and send events line by line
306        while let Some(line) = lines.next_line().await? {
307            // Check if we've reached the maximum count
308            if let Some(max) = max_count
309                && count >= max
310            {
311                tracing::info!("Reached maximum event count ({}), stopping", max);
312                break;
313            }
314
315            // Check if we've exceeded the time limit
316            if let Some(end_time) = deadline
317                && Instant::now() >= end_time
318            {
319                tracing::info!("Reached maximum time limit, stopping");
320                break;
321            }
322
323            line_number += 1;
324
325            // Skip empty lines
326            if line.trim().is_empty() {
327                continue;
328            }
329
330            // Try to parse the JSON
331            let record: RecordEntry<T> = match serde_json::from_str(&line) {
332                Ok(record) => record,
333                Err(e) => {
334                    tracing::warn!(
335                        "Failed to parse JSON on line {}: {}. Skipping.",
336                        line_number,
337                        e
338                    );
339                    continue;
340                }
341            };
342
343            let timestamp = record.timestamp;
344            let event = record.event;
345
346            // Handle timing if needed
347            if timed
348                && let Some(prev) = prev_timestamp
349                && timestamp > prev
350            {
351                let wait_time = timestamp - prev;
352                tokio::time::sleep(Duration::from_millis(wait_time)).await;
353            }
354
355            // Send the event
356            event_tx
357                .send(event)
358                .await
359                .map_err(|e| io::Error::other(format!("Failed to send event: {e}")))?;
360
361            // Update previous timestamp and count
362            prev_timestamp = Some(timestamp);
363            count += 1;
364        }
365
366        if count == 0 {
367            tracing::warn!("No events to send from file: {}", display_name);
368        } else {
369            tracing::info!("Sent {} events from {}", count, display_name);
370        }
371
372        Ok(count)
373    }
374}
375
376impl<T> Drop for Recorder<T> {
377    fn drop(&mut self) {
378        self.cancel.cancel();
379    }
380}
381
382/// Helper function to create a rotated file path with an index suffix
383fn create_rotated_path(base_path: &Path, index: usize) -> PathBuf {
384    let path_str = base_path.to_string_lossy();
385
386    if let Some(ext_pos) = path_str.rfind('.') {
387        // If there's an extension, insert the index before it
388        let (file_path, extension) = path_str.split_at(ext_pos);
389        PathBuf::from(format!("{}{}{}", file_path, index, extension))
390    } else {
391        // If there's no extension, just append the index
392        PathBuf::from(format!("{}{}", path_str, index))
393    }
394}
395
396#[cfg(test)]
397mod tests {
398    use super::*;
399    use std::time::Duration;
400    use tempfile::tempdir;
401
402    // Type alias for the TestEvent recorder
403    type TestEventRecorder = Recorder<TestEvent>;
404
405    // More complex event type
406    #[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
407    struct TestEvent {
408        id: u64,
409        name: String,
410        values: Vec<i32>,
411    }
412
413    impl TestEvent {
414        // Helper method to generate a random test event
415        fn new(id: u64) -> Self {
416            // Generate a random number of values between 1 and 100
417            let num_values = rand::random_range(1..=100);
418
419            // Generate random values (integers between -100 and 100)
420            let values = (0..num_values)
421                .map(|_| rand::random_range(-100..=100))
422                .collect();
423
424            // Create a name based on the ID
425            let name = format!("event_{}", id);
426
427            TestEvent { id, name, values }
428        }
429
430        // Helper method to generate a vector of random events
431        fn generate_events(count: usize) -> Vec<Self> {
432            (0..count).map(|i| Self::new(i as u64)).collect()
433        }
434    }
435
436    #[tokio::test]
437    async fn test_recorder_streams_events_to_file() {
438        // Create a temporary directory for output files
439        let dir = tempdir().unwrap();
440        let file_path = dir.path().join("events.jsonl");
441
442        let token = CancellationToken::new();
443        let recorder = TestEventRecorder::new(token.clone(), &file_path, None, None, None)
444            .await
445            .unwrap();
446        let event_tx = recorder.event_sender();
447
448        // Create test events using generate_events
449        let events = TestEvent::generate_events(2);
450        let event1 = events[0].clone();
451        let event2 = events[1].clone();
452
453        // Wait some time before the first event
454        tokio::time::sleep(Duration::from_millis(10)).await;
455
456        // Send the events
457        for event in &events {
458            event_tx.send(event.clone()).await.unwrap();
459        }
460
461        // Allow some time for processing
462        tokio::time::sleep(Duration::from_millis(10)).await;
463
464        // Check that both events were recorded
465        assert_eq!(recorder.event_count().await, 2);
466
467        // Check that the elapsed time is between 7 and 13 milliseconds
468        let elapsed_ms = recorder.elapsed_time().await.unwrap().as_millis();
469        if !(7..=13).contains(&elapsed_ms) {
470            println!("Actual elapsed time: {} ms", elapsed_ms);
471            assert!((7..=13).contains(&elapsed_ms));
472        }
473
474        // Force shutdown to flush file
475        recorder.shutdown();
476        tokio::time::sleep(Duration::from_millis(10)).await;
477
478        // Read the file and verify content
479        let content = fs::read_to_string(&file_path).await.unwrap();
480        let lines: Vec<&str> = content.lines().collect();
481
482        // Print the content of the JSONL file
483        println!("JSONL file content:");
484        for (i, line) in lines.iter().enumerate() {
485            println!("Line {}: {}", i + 1, line);
486        }
487
488        assert_eq!(lines.len(), 2, "Expected 2 lines in the file");
489
490        // Parse the lines to verify events
491        let entry1: RecordEntry<TestEvent> = serde_json::from_str(lines[0]).unwrap();
492        let entry2: RecordEntry<TestEvent> = serde_json::from_str(lines[1]).unwrap();
493
494        assert_eq!(entry1.event, event1);
495        assert_eq!(entry2.event, event2);
496        assert!(entry2.timestamp >= entry1.timestamp);
497    }
498
499    #[ignore]
500    #[tokio::test]
501    async fn load_test_100k_events() {
502        // Create a temporary directory for output files
503        let dir = tempdir().unwrap();
504        let file_path = dir.path().join("events.jsonl");
505
506        // Create a cancellation token for the recorder
507        let token = CancellationToken::new();
508
509        // Set max lines per file to 10,000 (should create 10 files total)
510        const MAX_LINES_PER_FILE: usize = 10_000;
511        let recorder = TestEventRecorder::new(
512            token.clone(),
513            &file_path,
514            Some(MAX_LINES_PER_FILE),
515            None,
516            None,
517        )
518        .await
519        .unwrap();
520        let event_tx = recorder.event_sender();
521
522        // Define number of events to generate
523        const NUM_EVENTS: usize = 100_000;
524        println!("Generating {} events...", NUM_EVENTS);
525
526        // Generate events using the helper method
527        let events = TestEvent::generate_events(NUM_EVENTS);
528
529        // Send events with progress reporting
530        for (i, event) in events.iter().enumerate() {
531            event_tx.send(event.clone()).await.unwrap();
532
533            // Print progress every 10,000 events
534            if i > 0 && i % 10_000 == 0 {
535                println!("Sent {} events...", i);
536            }
537        }
538
539        // Allow time for the recorder to process all events
540        println!("Waiting for events to be processed...");
541        tokio::time::sleep(Duration::from_millis(1000)).await;
542
543        // Verify that all events were recorded
544        let count = recorder.event_count().await;
545        println!("Recorded event count: {}", count);
546        assert_eq!(count, NUM_EVENTS);
547
548        // Force a clean shutdown to flush all pending writes
549        recorder.shutdown();
550        tokio::time::sleep(Duration::from_millis(100)).await;
551
552        // Check for the existence of all expected files
553        let base_file = file_path.clone();
554        let mut found_files = Vec::new();
555
556        // Check base file
557        if base_file.exists() {
558            found_files.push(base_file.clone());
559        }
560
561        // Check rotated files (1-9)
562        for i in 1..=9 {
563            let rotated_path = create_rotated_path(&base_file, i);
564            if rotated_path.exists() {
565                found_files.push(rotated_path);
566            }
567        }
568
569        // Check that we have exactly 10 files
570        assert_eq!(
571            found_files.len(),
572            10,
573            "Expected 10 files due to rotation with 10k events each"
574        );
575
576        // Add more stringent check for each file size
577        for (i, file_path) in found_files.iter().enumerate() {
578            let content = fs::read_to_string(file_path).await.unwrap();
579            let line_count = content.lines().count();
580
581            if i < found_files.len() - 1 {
582                // All files except the last one should have exactly MAX_LINES_PER_FILE lines
583                assert_eq!(
584                    line_count,
585                    MAX_LINES_PER_FILE,
586                    "File {} should contain exactly {} lines",
587                    file_path.display(),
588                    MAX_LINES_PER_FILE
589                );
590            } else {
591                // The last file might have fewer lines
592                assert!(
593                    line_count <= MAX_LINES_PER_FILE,
594                    "Last file should contain at most {} lines",
595                    MAX_LINES_PER_FILE
596                );
597            }
598        }
599
600        // Count total lines across all files
601        let mut total_lines = 0;
602
603        // Check that timestamps are weakly sorted within each file
604        for (i, file_path) in found_files.iter().enumerate() {
605            println!("Checking file {}: {}", i, file_path.display());
606
607            // Count lines in the file
608            let content = fs::read_to_string(file_path).await.unwrap();
609            let line_count = content.lines().count();
610
611            // Should have MAX_LINES_PER_FILE lines in each file (except maybe the last one)
612            if i < found_files.len() - 1 {
613                assert_eq!(
614                    line_count, MAX_LINES_PER_FILE,
615                    "Each file except possibly the last should have exactly MAX_LINES_PER_FILE lines"
616                );
617            }
618
619            total_lines += line_count;
620
621            // Check that timestamps are weakly sorted within each file
622            let file = File::open(file_path).await.unwrap();
623            let reader = BufReader::new(file);
624            let mut lines = reader.lines();
625
626            let mut prev_timestamp: Option<u64> = None;
627            let mut line_number = 0;
628            let mut unsorted_count = 0;
629
630            // Check timestamps in the file without loading everything into memory
631            while let Some(line) = lines.next_line().await.unwrap() {
632                line_number += 1;
633                let entry: RecordEntry<TestEvent> = serde_json::from_str(&line).unwrap();
634
635                if let Some(prev) = prev_timestamp
636                    && entry.timestamp < prev
637                {
638                    unsorted_count += 1;
639                    if unsorted_count <= 5 {
640                        // Only log first 5 violations to avoid spam
641                        println!(
642                            "Timestamp order violation in file {} at line {}: {} < {}",
643                            file_path.display(),
644                            line_number,
645                            entry.timestamp,
646                            prev
647                        );
648                    }
649                }
650
651                prev_timestamp = Some(entry.timestamp);
652            }
653
654            assert_eq!(
655                unsorted_count, 0,
656                "Timestamps should be weakly sorted within each file"
657            );
658        }
659
660        assert_eq!(
661            total_lines, NUM_EVENTS,
662            "Total lines across all files should match NUM_EVENTS"
663        );
664
665        println!("Load test with file rotation completed successfully");
666    }
667}