dynamo_llm/
recorder.rs

1// SPDX-FileCopyrightText: Copyright (c) 2024-2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
2// SPDX-License-Identifier: Apache-2.0
3//
4// Licensed under the Apache License, Version 2.0 (the "License");
5// you may not use this file except in compliance with the License.
6// You may obtain a copy of the License at
7//
8// http://www.apache.org/licenses/LICENSE-2.0
9//
10// Unless required by applicable law or agreed to in writing, software
11// distributed under the License is distributed on an "AS IS" BASIS,
12// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13// See the License for the specific language governing permissions and
14// limitations under the License.
15
16use serde::{Deserialize, Serialize};
17use std::io;
18use std::path::{Path, PathBuf};
19use std::sync::Arc;
20use std::time::{Duration, Instant};
21use tokio::fs::{self, File, OpenOptions};
22use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader, BufWriter};
23use tokio::sync::{mpsc, Mutex};
24use tokio_util::sync::CancellationToken;
25use tracing as log;
26
27/// Record entry that will be serialized to JSONL
28#[derive(Serialize, Deserialize)]
29struct RecordEntry<T>
30where
31    T: Clone,
32{
33    timestamp: u64,
34    event: T,
35}
36
37/// A generic recorder for events that streams directly to a JSONL file
38pub struct Recorder<T> {
39    /// A sender for events that can be cloned and shared with producers
40    event_tx: mpsc::Sender<T>,
41    /// A cancellation token for managing shutdown
42    cancel: CancellationToken,
43    /// Counter for the number of events written
44    event_count: Arc<Mutex<usize>>,
45    /// Time when the first event was received
46    first_event_time: Arc<Mutex<Option<Instant>>>,
47}
48
49impl<T> Recorder<T>
50where
51    T: Serialize + for<'de> Deserialize<'de> + Clone + Send + Sync + 'static,
52{
53    /// Create a new Recorder that streams events directly to a JSONL file
54    ///
55    /// ### Arguments
56    ///
57    /// * `token` - A cancellation token for managing shutdown
58    /// * `output_path` - Path to the JSONL file to write events to
59    /// * `max_lines_per_file` - Maximum number of lines per file before rotating to a new file.
60    ///   If None, no rotation will occur.
61    /// * `max_count` - Maximum number of events to record before shutting down.
62    ///   If None, no limit will be applied.
63    /// * `max_time` - Maximum duration in seconds to record before shutting down.
64    ///   If None, no time limit will be applied.
65    ///
66    /// ### Returns
67    ///
68    /// A Result with a new Recorder that streams events to the specified file
69    pub async fn new<P: AsRef<Path>>(
70        token: CancellationToken,
71        output_path: P,
72        max_lines_per_file: Option<usize>,
73        max_count: Option<usize>,
74        max_time: Option<f64>,
75    ) -> io::Result<Self> {
76        let (event_tx, mut event_rx) = mpsc::channel::<T>(2048);
77        let event_count = Arc::new(Mutex::new(0));
78        let event_count_clone = event_count.clone();
79        let cancel_clone = token.clone();
80        let start_time = Instant::now();
81        let first_event_time = Arc::new(Mutex::new(None));
82        let first_event_time_clone = first_event_time.clone();
83
84        // Ensure the directory exists
85        if let Some(parent) = output_path.as_ref().parent() {
86            if !parent.exists() {
87                fs::create_dir_all(parent).await?;
88            }
89        }
90
91        // Create the file for writing
92        let file = OpenOptions::new()
93            .create(true)
94            .write(true)
95            .truncate(true)
96            .open(&output_path)
97            .await?;
98
99        let file_path = output_path.as_ref().to_path_buf();
100
101        // Spawn a task to receive events and write them to the file
102        tokio::spawn(async move {
103            let start_time = start_time;
104            let mut writer = BufWriter::with_capacity(32768, file);
105            let mut line_count = 0;
106            let mut file_index = 0;
107            let base_path = file_path.clone();
108
109            // Set up max time deadline if specified
110            let max_time_deadline = max_time.map(|secs| {
111                let duration = Duration::from_secs_f64(secs);
112                start_time + duration
113            });
114
115            loop {
116                // Check time limit if set
117                if let Some(deadline) = max_time_deadline {
118                    if Instant::now() >= deadline {
119                        log::info!("Recorder reached max time limit, shutting down");
120                        // Flush and cancel
121                        if let Err(e) = writer.flush().await {
122                            log::error!("Failed to flush on time limit shutdown: {}", e);
123                        }
124                        cancel_clone.cancel();
125                        return;
126                    }
127                }
128
129                tokio::select! {
130                    biased;
131
132                    _ = cancel_clone.cancelled() => {
133                        // Flush any pending writes before shutting down
134                        if let Err(e) = writer.flush().await {
135                            log::error!("Failed to flush on shutdown: {}", e);
136                        }
137
138                        log::debug!("Recorder task shutting down");
139                        return;
140                    }
141
142                    Some(event) = event_rx.recv() => {
143                        // Update first_event_time if this is the first event
144                        {
145                            let mut first_time = first_event_time_clone.lock().await;
146                            if first_time.is_none() {
147                                *first_time = Some(Instant::now());
148                            }
149                        }
150
151                        // Calculate elapsed time in milliseconds
152                        let elapsed_ms = start_time.elapsed().as_millis() as u64;
153
154                        // Create the record entry
155                        let entry = RecordEntry {
156                            timestamp: elapsed_ms,
157                            event,
158                        };
159
160                        // Serialize to JSON string
161                        let json = match serde_json::to_string(&entry) {
162                            Ok(json) => json,
163                            Err(e) => {
164                                log::error!("Failed to serialize event: {}", e);
165                                continue;
166                            }
167                        };
168
169                        // Write JSON line
170                        if let Err(e) = writer.write_all(json.as_bytes()).await {
171                            log::error!("Failed to write event: {}", e);
172                            continue;
173                        }
174
175                        // Add a newline
176                        if let Err(e) = writer.write_all(b"\n").await {
177                            log::error!("Failed to write newline: {}", e);
178                            continue;
179                        }
180
181                        // Increment line count
182                        line_count += 1;
183
184                        // Check if we need to rotate to a new file
185                        if let Some(max_lines) = max_lines_per_file {
186                            if line_count >= max_lines {
187                                // Flush the current file
188                                if let Err(e) = writer.flush().await {
189                                    log::error!("Failed to flush file before rotation: {}", e);
190                                }
191
192                                // Create new filename with suffix
193                                file_index += 1;
194                                let new_path = create_rotated_path(&base_path, file_index);
195
196                                // Open new file
197                                match OpenOptions::new()
198                                    .create(true)
199                                    .write(true)
200                                    .truncate(true)
201                                    .open(&new_path)
202                                    .await
203                                {
204                                    Ok(new_file) => {
205                                        writer = BufWriter::with_capacity(32768, new_file);
206                                        line_count = 0;
207                                        log::info!("Rotated to new file: {}", new_path.display());
208                                    },
209                                    Err(e) => {
210                                        log::error!("Failed to open rotated file {}: {}", new_path.display(), e);
211                                        // Continue with the existing file if rotation fails
212                                    }
213                                }
214                            }
215                        }
216
217                        // Update event count
218                        let mut count = event_count_clone.lock().await;
219                        *count += 1;
220
221                        // Check if we've reached the maximum count
222                        if let Some(max) = max_count {
223                            if *count >= max {
224                                log::info!("Recorder reached max event count ({}), shutting down", max);
225                                // Flush buffer before shutting down
226                                if let Err(e) = writer.flush().await {
227                                    log::error!("Failed to flush on count limit shutdown: {}", e);
228                                }
229                                // Drop the lock before cancelling
230                                drop(count);
231                                cancel_clone.cancel();
232                                return;
233                            }
234                        }
235                    }
236                }
237            }
238        });
239
240        Ok(Self {
241            event_tx,
242            cancel: token,
243            event_count,
244            first_event_time,
245        })
246    }
247
248    /// Get a sender that can be used to send events to the recorder
249    pub fn event_sender(&self) -> mpsc::Sender<T> {
250        self.event_tx.clone()
251    }
252
253    /// Get the count of recorded events
254    pub async fn event_count(&self) -> usize {
255        *self.event_count.lock().await
256    }
257
258    /// Get the elapsed time since the first event was received
259    ///
260    /// Returns a Result with the elapsed time or an error if no events have been received yet
261    pub async fn elapsed_time(&self) -> io::Result<Duration> {
262        let first_time = self.first_event_time.lock().await;
263        match *first_time {
264            Some(time) => Ok(time.elapsed()),
265            None => Err(io::Error::new(
266                io::ErrorKind::Other,
267                "No events received yet",
268            )),
269        }
270    }
271
272    /// Shutdown the recorder
273    pub fn shutdown(&self) {
274        self.cancel.cancel();
275    }
276
277    /// Send events from a JSONL file to the provided event sender
278    ///
279    /// ### Arguments
280    ///
281    /// * `filename` - Path to the JSONL file to read events from
282    /// * `event_tx` - A sender for events
283    /// * `timed` - If true, events will be sent according to their recorded timestamps.
284    ///   If false, events will be sent as fast as possible without delay.
285    /// * `max_count` - Maximum number of events to send before stopping. If None, all events will be sent.
286    /// * `max_time` - Maximum duration in seconds to send events before stopping. If None, no time limit.
287    ///
288    /// ### Returns
289    ///
290    /// A Result indicating success or failure with the number of events sent
291    pub async fn send_events<P: AsRef<Path>>(
292        filename: P,
293        event_tx: &mpsc::Sender<T>,
294        timed: bool,
295        max_count: Option<usize>,
296        max_time: Option<f64>,
297    ) -> io::Result<usize> {
298        // Store the display name before using filename
299        let display_name = filename.as_ref().display().to_string();
300
301        // Check if file exists
302        if !filename.as_ref().exists() {
303            return Err(io::Error::new(
304                io::ErrorKind::NotFound,
305                format!("File not found: {}", display_name),
306            ));
307        }
308
309        // Set up start time and deadline if max_time is specified
310        let start_time = Instant::now();
311        let deadline = max_time.map(|secs| start_time + Duration::from_secs_f64(secs));
312
313        // Open the file for reading using tokio's async file I/O
314        let file = File::open(&filename).await?;
315        let reader = BufReader::with_capacity(32768, file);
316        let mut lines = reader.lines();
317
318        let mut count = 0;
319        let mut line_number = 0;
320        let mut prev_timestamp: Option<u64> = None;
321
322        // Read and send events line by line
323        while let Some(line) = lines.next_line().await? {
324            // Check if we've reached the maximum count
325            if let Some(max) = max_count {
326                if count >= max {
327                    log::info!("Reached maximum event count ({}), stopping", max);
328                    break;
329                }
330            }
331
332            // Check if we've exceeded the time limit
333            if let Some(end_time) = deadline {
334                if Instant::now() >= end_time {
335                    log::info!("Reached maximum time limit, stopping");
336                    break;
337                }
338            }
339
340            line_number += 1;
341
342            // Skip empty lines
343            if line.trim().is_empty() {
344                continue;
345            }
346
347            // Try to parse the JSON
348            let record: RecordEntry<T> = match serde_json::from_str(&line) {
349                Ok(record) => record,
350                Err(e) => {
351                    log::warn!(
352                        "Failed to parse JSON on line {}: {}. Skipping.",
353                        line_number,
354                        e
355                    );
356                    continue;
357                }
358            };
359
360            let timestamp = record.timestamp;
361            let event = record.event;
362
363            // Handle timing if needed
364            if timed && prev_timestamp.is_some() {
365                let prev = prev_timestamp.unwrap();
366                if timestamp > prev {
367                    let wait_time = timestamp - prev;
368                    tokio::time::sleep(Duration::from_millis(wait_time)).await;
369                }
370            }
371
372            // Send the event
373            event_tx.send(event).await.map_err(|e| {
374                io::Error::new(io::ErrorKind::Other, format!("Failed to send event: {}", e))
375            })?;
376
377            // Update previous timestamp and count
378            prev_timestamp = Some(timestamp);
379            count += 1;
380        }
381
382        if count == 0 {
383            log::warn!("No events to send from file: {}", display_name);
384        } else {
385            log::info!("Sent {} events from {}", count, display_name);
386        }
387
388        Ok(count)
389    }
390}
391
392/// Helper function to create a rotated file path with an index suffix
393fn create_rotated_path(base_path: &Path, index: usize) -> PathBuf {
394    let path_str = base_path.to_string_lossy();
395
396    if let Some(ext_pos) = path_str.rfind('.') {
397        // If there's an extension, insert the index before it
398        let (file_path, extension) = path_str.split_at(ext_pos);
399        PathBuf::from(format!("{}{}{}", file_path, index, extension))
400    } else {
401        // If there's no extension, just append the index
402        PathBuf::from(format!("{}{}", path_str, index))
403    }
404}
405
406#[cfg(test)]
407mod tests {
408    use super::*;
409    use std::time::Duration;
410    use tempfile::tempdir;
411
412    // Type alias for the TestEvent recorder
413    type TestEventRecorder = Recorder<TestEvent>;
414
415    // More complex event type
416    #[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
417    struct TestEvent {
418        id: u64,
419        name: String,
420        values: Vec<i32>,
421    }
422
423    impl TestEvent {
424        // Helper method to generate a random test event
425        fn new(id: u64) -> Self {
426            // Generate a random number of values between 1 and 100
427            let num_values = rand::random_range(1..=100);
428
429            // Generate random values (integers between -100 and 100)
430            let values = (0..num_values)
431                .map(|_| rand::random_range(-100..=100))
432                .collect();
433
434            // Create a name based on the ID
435            let name = format!("event_{}", id);
436
437            TestEvent { id, name, values }
438        }
439
440        // Helper method to generate a vector of random events
441        fn generate_events(count: usize) -> Vec<Self> {
442            (0..count).map(|i| Self::new(i as u64)).collect()
443        }
444    }
445
446    #[tokio::test]
447    async fn test_recorder_streams_events_to_file() {
448        // Create a temporary directory for output files
449        let dir = tempdir().unwrap();
450        let file_path = dir.path().join("events.jsonl");
451
452        let token = CancellationToken::new();
453        let recorder = TestEventRecorder::new(token.clone(), &file_path, None, None, None)
454            .await
455            .unwrap();
456        let event_tx = recorder.event_sender();
457
458        // Create test events using generate_events
459        let events = TestEvent::generate_events(2);
460        let event1 = events[0].clone();
461        let event2 = events[1].clone();
462
463        // Wait some time before the first event
464        tokio::time::sleep(Duration::from_millis(10)).await;
465
466        // Send the events
467        for event in &events {
468            event_tx.send(event.clone()).await.unwrap();
469        }
470
471        // Allow some time for processing
472        tokio::time::sleep(Duration::from_millis(10)).await;
473
474        // Check that both events were recorded
475        assert_eq!(recorder.event_count().await, 2);
476
477        // Check that the elapsed time is between 9 and 11 milliseconds
478        let elapsed_ms = recorder.elapsed_time().await.unwrap().as_millis();
479        if !(9..=11).contains(&elapsed_ms) {
480            println!("Actual elapsed time: {} ms", elapsed_ms);
481            assert!((9..=11).contains(&elapsed_ms));
482        }
483
484        // Force shutdown to flush file
485        recorder.shutdown();
486        tokio::time::sleep(Duration::from_millis(10)).await;
487
488        // Read the file and verify content
489        let content = fs::read_to_string(&file_path).await.unwrap();
490        let lines: Vec<&str> = content.lines().collect();
491
492        // Print the content of the JSONL file
493        println!("JSONL file content:");
494        for (i, line) in lines.iter().enumerate() {
495            println!("Line {}: {}", i + 1, line);
496        }
497
498        assert_eq!(lines.len(), 2, "Expected 2 lines in the file");
499
500        // Parse the lines to verify events
501        let entry1: RecordEntry<TestEvent> = serde_json::from_str(lines[0]).unwrap();
502        let entry2: RecordEntry<TestEvent> = serde_json::from_str(lines[1]).unwrap();
503
504        assert_eq!(entry1.event, event1);
505        assert_eq!(entry2.event, event2);
506        assert!(entry2.timestamp >= entry1.timestamp);
507    }
508
509    #[ignore]
510    #[tokio::test]
511    async fn load_test_100k_events() {
512        // Create a temporary directory for output files
513        let dir = tempdir().unwrap();
514        let file_path = dir.path().join("events.jsonl");
515
516        // Create a cancellation token for the recorder
517        let token = CancellationToken::new();
518
519        // Set max lines per file to 10,000 (should create 10 files total)
520        const MAX_LINES_PER_FILE: usize = 10_000;
521        let recorder = TestEventRecorder::new(
522            token.clone(),
523            &file_path,
524            Some(MAX_LINES_PER_FILE),
525            None,
526            None,
527        )
528        .await
529        .unwrap();
530        let event_tx = recorder.event_sender();
531
532        // Define number of events to generate
533        const NUM_EVENTS: usize = 100_000;
534        println!("Generating {} events...", NUM_EVENTS);
535
536        // Generate events using the helper method
537        let events = TestEvent::generate_events(NUM_EVENTS);
538
539        // Send events with progress reporting
540        for (i, event) in events.iter().enumerate() {
541            event_tx.send(event.clone()).await.unwrap();
542
543            // Print progress every 10,000 events
544            if i > 0 && i % 10_000 == 0 {
545                println!("Sent {} events...", i);
546            }
547        }
548
549        // Allow time for the recorder to process all events
550        println!("Waiting for events to be processed...");
551        tokio::time::sleep(Duration::from_millis(1000)).await;
552
553        // Verify that all events were recorded
554        let count = recorder.event_count().await;
555        println!("Recorded event count: {}", count);
556        assert_eq!(count, NUM_EVENTS);
557
558        // Force a clean shutdown to flush all pending writes
559        recorder.shutdown();
560        tokio::time::sleep(Duration::from_millis(100)).await;
561
562        // Check for the existence of all expected files
563        let base_file = file_path.clone();
564        let mut found_files = Vec::new();
565
566        // Check base file
567        if base_file.exists() {
568            found_files.push(base_file.clone());
569        }
570
571        // Check rotated files (1-9)
572        for i in 1..=9 {
573            let rotated_path = create_rotated_path(&base_file, i);
574            if rotated_path.exists() {
575                found_files.push(rotated_path);
576            }
577        }
578
579        // Check that we have exactly 10 files
580        assert_eq!(
581            found_files.len(),
582            10,
583            "Expected 10 files due to rotation with 10k events each"
584        );
585
586        // Add more stringent check for each file size
587        for (i, file_path) in found_files.iter().enumerate() {
588            let content = fs::read_to_string(file_path).await.unwrap();
589            let line_count = content.lines().count();
590
591            if i < found_files.len() - 1 {
592                // All files except the last one should have exactly MAX_LINES_PER_FILE lines
593                assert_eq!(
594                    line_count,
595                    MAX_LINES_PER_FILE,
596                    "File {} should contain exactly {} lines",
597                    file_path.display(),
598                    MAX_LINES_PER_FILE
599                );
600            } else {
601                // The last file might have fewer lines
602                assert!(
603                    line_count <= MAX_LINES_PER_FILE,
604                    "Last file should contain at most {} lines",
605                    MAX_LINES_PER_FILE
606                );
607            }
608        }
609
610        // Count total lines across all files
611        let mut total_lines = 0;
612
613        // Check that timestamps are weakly sorted within each file
614        for (i, file_path) in found_files.iter().enumerate() {
615            println!("Checking file {}: {}", i, file_path.display());
616
617            // Count lines in the file
618            let content = fs::read_to_string(file_path).await.unwrap();
619            let line_count = content.lines().count();
620
621            // Should have MAX_LINES_PER_FILE lines in each file (except maybe the last one)
622            if i < found_files.len() - 1 {
623                assert_eq!(line_count, MAX_LINES_PER_FILE, "Each file except possibly the last should have exactly MAX_LINES_PER_FILE lines");
624            }
625
626            total_lines += line_count;
627
628            // Check that timestamps are weakly sorted within each file
629            let file = File::open(file_path).await.unwrap();
630            let reader = BufReader::new(file);
631            let mut lines = reader.lines();
632
633            let mut prev_timestamp: Option<u64> = None;
634            let mut line_number = 0;
635            let mut unsorted_count = 0;
636
637            // Check timestamps in the file without loading everything into memory
638            while let Some(line) = lines.next_line().await.unwrap() {
639                line_number += 1;
640                let entry: RecordEntry<TestEvent> = serde_json::from_str(&line).unwrap();
641
642                if let Some(prev) = prev_timestamp {
643                    if entry.timestamp < prev {
644                        unsorted_count += 1;
645                        if unsorted_count <= 5 {
646                            // Only log first 5 violations to avoid spam
647                            println!(
648                                "Timestamp order violation in file {} at line {}: {} < {}",
649                                file_path.display(),
650                                line_number,
651                                entry.timestamp,
652                                prev
653                            );
654                        }
655                    }
656                }
657
658                prev_timestamp = Some(entry.timestamp);
659            }
660
661            assert_eq!(
662                unsorted_count, 0,
663                "Timestamps should be weakly sorted within each file"
664            );
665        }
666
667        assert_eq!(
668            total_lines, NUM_EVENTS,
669            "Total lines across all files should match NUM_EVENTS"
670        );
671
672        println!("Load test with file rotation completed successfully");
673    }
674}