1use serde::{Deserialize, Serialize};
17use std::io;
18use std::path::{Path, PathBuf};
19use std::sync::Arc;
20use std::time::{Duration, Instant};
21use tokio::fs::{self, File, OpenOptions};
22use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader, BufWriter};
23use tokio::sync::{mpsc, Mutex};
24use tokio_util::sync::CancellationToken;
25use tracing as log;
26
27#[derive(Serialize, Deserialize)]
29struct RecordEntry<T>
30where
31 T: Clone,
32{
33 timestamp: u64,
34 event: T,
35}
36
37pub struct Recorder<T> {
39 event_tx: mpsc::Sender<T>,
41 cancel: CancellationToken,
43 event_count: Arc<Mutex<usize>>,
45 first_event_time: Arc<Mutex<Option<Instant>>>,
47}
48
49impl<T> Recorder<T>
50where
51 T: Serialize + for<'de> Deserialize<'de> + Clone + Send + Sync + 'static,
52{
53 pub async fn new<P: AsRef<Path>>(
70 token: CancellationToken,
71 output_path: P,
72 max_lines_per_file: Option<usize>,
73 max_count: Option<usize>,
74 max_time: Option<f64>,
75 ) -> io::Result<Self> {
76 let (event_tx, mut event_rx) = mpsc::channel::<T>(2048);
77 let event_count = Arc::new(Mutex::new(0));
78 let event_count_clone = event_count.clone();
79 let cancel_clone = token.clone();
80 let start_time = Instant::now();
81 let first_event_time = Arc::new(Mutex::new(None));
82 let first_event_time_clone = first_event_time.clone();
83
84 if let Some(parent) = output_path.as_ref().parent() {
86 if !parent.exists() {
87 fs::create_dir_all(parent).await?;
88 }
89 }
90
91 let file = OpenOptions::new()
93 .create(true)
94 .write(true)
95 .truncate(true)
96 .open(&output_path)
97 .await?;
98
99 let file_path = output_path.as_ref().to_path_buf();
100
101 tokio::spawn(async move {
103 let start_time = start_time;
104 let mut writer = BufWriter::with_capacity(32768, file);
105 let mut line_count = 0;
106 let mut file_index = 0;
107 let base_path = file_path.clone();
108
109 let max_time_deadline = max_time.map(|secs| {
111 let duration = Duration::from_secs_f64(secs);
112 start_time + duration
113 });
114
115 loop {
116 if let Some(deadline) = max_time_deadline {
118 if Instant::now() >= deadline {
119 log::info!("Recorder reached max time limit, shutting down");
120 if let Err(e) = writer.flush().await {
122 log::error!("Failed to flush on time limit shutdown: {}", e);
123 }
124 cancel_clone.cancel();
125 return;
126 }
127 }
128
129 tokio::select! {
130 biased;
131
132 _ = cancel_clone.cancelled() => {
133 if let Err(e) = writer.flush().await {
135 log::error!("Failed to flush on shutdown: {}", e);
136 }
137
138 log::debug!("Recorder task shutting down");
139 return;
140 }
141
142 Some(event) = event_rx.recv() => {
143 {
145 let mut first_time = first_event_time_clone.lock().await;
146 if first_time.is_none() {
147 *first_time = Some(Instant::now());
148 }
149 }
150
151 let elapsed_ms = start_time.elapsed().as_millis() as u64;
153
154 let entry = RecordEntry {
156 timestamp: elapsed_ms,
157 event,
158 };
159
160 let json = match serde_json::to_string(&entry) {
162 Ok(json) => json,
163 Err(e) => {
164 log::error!("Failed to serialize event: {}", e);
165 continue;
166 }
167 };
168
169 if let Err(e) = writer.write_all(json.as_bytes()).await {
171 log::error!("Failed to write event: {}", e);
172 continue;
173 }
174
175 if let Err(e) = writer.write_all(b"\n").await {
177 log::error!("Failed to write newline: {}", e);
178 continue;
179 }
180
181 line_count += 1;
183
184 if let Some(max_lines) = max_lines_per_file {
186 if line_count >= max_lines {
187 if let Err(e) = writer.flush().await {
189 log::error!("Failed to flush file before rotation: {}", e);
190 }
191
192 file_index += 1;
194 let new_path = create_rotated_path(&base_path, file_index);
195
196 match OpenOptions::new()
198 .create(true)
199 .write(true)
200 .truncate(true)
201 .open(&new_path)
202 .await
203 {
204 Ok(new_file) => {
205 writer = BufWriter::with_capacity(32768, new_file);
206 line_count = 0;
207 log::info!("Rotated to new file: {}", new_path.display());
208 },
209 Err(e) => {
210 log::error!("Failed to open rotated file {}: {}", new_path.display(), e);
211 }
213 }
214 }
215 }
216
217 let mut count = event_count_clone.lock().await;
219 *count += 1;
220
221 if let Some(max) = max_count {
223 if *count >= max {
224 log::info!("Recorder reached max event count ({}), shutting down", max);
225 if let Err(e) = writer.flush().await {
227 log::error!("Failed to flush on count limit shutdown: {}", e);
228 }
229 drop(count);
231 cancel_clone.cancel();
232 return;
233 }
234 }
235 }
236 }
237 }
238 });
239
240 Ok(Self {
241 event_tx,
242 cancel: token,
243 event_count,
244 first_event_time,
245 })
246 }
247
248 pub fn event_sender(&self) -> mpsc::Sender<T> {
250 self.event_tx.clone()
251 }
252
253 pub async fn event_count(&self) -> usize {
255 *self.event_count.lock().await
256 }
257
258 pub async fn elapsed_time(&self) -> io::Result<Duration> {
262 let first_time = self.first_event_time.lock().await;
263 match *first_time {
264 Some(time) => Ok(time.elapsed()),
265 None => Err(io::Error::new(
266 io::ErrorKind::Other,
267 "No events received yet",
268 )),
269 }
270 }
271
272 pub fn shutdown(&self) {
274 self.cancel.cancel();
275 }
276
277 pub async fn send_events<P: AsRef<Path>>(
292 filename: P,
293 event_tx: &mpsc::Sender<T>,
294 timed: bool,
295 max_count: Option<usize>,
296 max_time: Option<f64>,
297 ) -> io::Result<usize> {
298 let display_name = filename.as_ref().display().to_string();
300
301 if !filename.as_ref().exists() {
303 return Err(io::Error::new(
304 io::ErrorKind::NotFound,
305 format!("File not found: {}", display_name),
306 ));
307 }
308
309 let start_time = Instant::now();
311 let deadline = max_time.map(|secs| start_time + Duration::from_secs_f64(secs));
312
313 let file = File::open(&filename).await?;
315 let reader = BufReader::with_capacity(32768, file);
316 let mut lines = reader.lines();
317
318 let mut count = 0;
319 let mut line_number = 0;
320 let mut prev_timestamp: Option<u64> = None;
321
322 while let Some(line) = lines.next_line().await? {
324 if let Some(max) = max_count {
326 if count >= max {
327 log::info!("Reached maximum event count ({}), stopping", max);
328 break;
329 }
330 }
331
332 if let Some(end_time) = deadline {
334 if Instant::now() >= end_time {
335 log::info!("Reached maximum time limit, stopping");
336 break;
337 }
338 }
339
340 line_number += 1;
341
342 if line.trim().is_empty() {
344 continue;
345 }
346
347 let record: RecordEntry<T> = match serde_json::from_str(&line) {
349 Ok(record) => record,
350 Err(e) => {
351 log::warn!(
352 "Failed to parse JSON on line {}: {}. Skipping.",
353 line_number,
354 e
355 );
356 continue;
357 }
358 };
359
360 let timestamp = record.timestamp;
361 let event = record.event;
362
363 if timed && prev_timestamp.is_some() {
365 let prev = prev_timestamp.unwrap();
366 if timestamp > prev {
367 let wait_time = timestamp - prev;
368 tokio::time::sleep(Duration::from_millis(wait_time)).await;
369 }
370 }
371
372 event_tx.send(event).await.map_err(|e| {
374 io::Error::new(io::ErrorKind::Other, format!("Failed to send event: {}", e))
375 })?;
376
377 prev_timestamp = Some(timestamp);
379 count += 1;
380 }
381
382 if count == 0 {
383 log::warn!("No events to send from file: {}", display_name);
384 } else {
385 log::info!("Sent {} events from {}", count, display_name);
386 }
387
388 Ok(count)
389 }
390}
391
392fn create_rotated_path(base_path: &Path, index: usize) -> PathBuf {
394 let path_str = base_path.to_string_lossy();
395
396 if let Some(ext_pos) = path_str.rfind('.') {
397 let (file_path, extension) = path_str.split_at(ext_pos);
399 PathBuf::from(format!("{}{}{}", file_path, index, extension))
400 } else {
401 PathBuf::from(format!("{}{}", path_str, index))
403 }
404}
405
406#[cfg(test)]
407mod tests {
408 use super::*;
409 use std::time::Duration;
410 use tempfile::tempdir;
411
412 type TestEventRecorder = Recorder<TestEvent>;
414
415 #[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
417 struct TestEvent {
418 id: u64,
419 name: String,
420 values: Vec<i32>,
421 }
422
423 impl TestEvent {
424 fn new(id: u64) -> Self {
426 let num_values = rand::random_range(1..=100);
428
429 let values = (0..num_values)
431 .map(|_| rand::random_range(-100..=100))
432 .collect();
433
434 let name = format!("event_{}", id);
436
437 TestEvent { id, name, values }
438 }
439
440 fn generate_events(count: usize) -> Vec<Self> {
442 (0..count).map(|i| Self::new(i as u64)).collect()
443 }
444 }
445
446 #[tokio::test]
447 async fn test_recorder_streams_events_to_file() {
448 let dir = tempdir().unwrap();
450 let file_path = dir.path().join("events.jsonl");
451
452 let token = CancellationToken::new();
453 let recorder = TestEventRecorder::new(token.clone(), &file_path, None, None, None)
454 .await
455 .unwrap();
456 let event_tx = recorder.event_sender();
457
458 let events = TestEvent::generate_events(2);
460 let event1 = events[0].clone();
461 let event2 = events[1].clone();
462
463 tokio::time::sleep(Duration::from_millis(10)).await;
465
466 for event in &events {
468 event_tx.send(event.clone()).await.unwrap();
469 }
470
471 tokio::time::sleep(Duration::from_millis(10)).await;
473
474 assert_eq!(recorder.event_count().await, 2);
476
477 let elapsed_ms = recorder.elapsed_time().await.unwrap().as_millis();
479 if !(9..=11).contains(&elapsed_ms) {
480 println!("Actual elapsed time: {} ms", elapsed_ms);
481 assert!((9..=11).contains(&elapsed_ms));
482 }
483
484 recorder.shutdown();
486 tokio::time::sleep(Duration::from_millis(10)).await;
487
488 let content = fs::read_to_string(&file_path).await.unwrap();
490 let lines: Vec<&str> = content.lines().collect();
491
492 println!("JSONL file content:");
494 for (i, line) in lines.iter().enumerate() {
495 println!("Line {}: {}", i + 1, line);
496 }
497
498 assert_eq!(lines.len(), 2, "Expected 2 lines in the file");
499
500 let entry1: RecordEntry<TestEvent> = serde_json::from_str(lines[0]).unwrap();
502 let entry2: RecordEntry<TestEvent> = serde_json::from_str(lines[1]).unwrap();
503
504 assert_eq!(entry1.event, event1);
505 assert_eq!(entry2.event, event2);
506 assert!(entry2.timestamp >= entry1.timestamp);
507 }
508
509 #[ignore]
510 #[tokio::test]
511 async fn load_test_100k_events() {
512 let dir = tempdir().unwrap();
514 let file_path = dir.path().join("events.jsonl");
515
516 let token = CancellationToken::new();
518
519 const MAX_LINES_PER_FILE: usize = 10_000;
521 let recorder = TestEventRecorder::new(
522 token.clone(),
523 &file_path,
524 Some(MAX_LINES_PER_FILE),
525 None,
526 None,
527 )
528 .await
529 .unwrap();
530 let event_tx = recorder.event_sender();
531
532 const NUM_EVENTS: usize = 100_000;
534 println!("Generating {} events...", NUM_EVENTS);
535
536 let events = TestEvent::generate_events(NUM_EVENTS);
538
539 for (i, event) in events.iter().enumerate() {
541 event_tx.send(event.clone()).await.unwrap();
542
543 if i > 0 && i % 10_000 == 0 {
545 println!("Sent {} events...", i);
546 }
547 }
548
549 println!("Waiting for events to be processed...");
551 tokio::time::sleep(Duration::from_millis(1000)).await;
552
553 let count = recorder.event_count().await;
555 println!("Recorded event count: {}", count);
556 assert_eq!(count, NUM_EVENTS);
557
558 recorder.shutdown();
560 tokio::time::sleep(Duration::from_millis(100)).await;
561
562 let base_file = file_path.clone();
564 let mut found_files = Vec::new();
565
566 if base_file.exists() {
568 found_files.push(base_file.clone());
569 }
570
571 for i in 1..=9 {
573 let rotated_path = create_rotated_path(&base_file, i);
574 if rotated_path.exists() {
575 found_files.push(rotated_path);
576 }
577 }
578
579 assert_eq!(
581 found_files.len(),
582 10,
583 "Expected 10 files due to rotation with 10k events each"
584 );
585
586 for (i, file_path) in found_files.iter().enumerate() {
588 let content = fs::read_to_string(file_path).await.unwrap();
589 let line_count = content.lines().count();
590
591 if i < found_files.len() - 1 {
592 assert_eq!(
594 line_count,
595 MAX_LINES_PER_FILE,
596 "File {} should contain exactly {} lines",
597 file_path.display(),
598 MAX_LINES_PER_FILE
599 );
600 } else {
601 assert!(
603 line_count <= MAX_LINES_PER_FILE,
604 "Last file should contain at most {} lines",
605 MAX_LINES_PER_FILE
606 );
607 }
608 }
609
610 let mut total_lines = 0;
612
613 for (i, file_path) in found_files.iter().enumerate() {
615 println!("Checking file {}: {}", i, file_path.display());
616
617 let content = fs::read_to_string(file_path).await.unwrap();
619 let line_count = content.lines().count();
620
621 if i < found_files.len() - 1 {
623 assert_eq!(line_count, MAX_LINES_PER_FILE, "Each file except possibly the last should have exactly MAX_LINES_PER_FILE lines");
624 }
625
626 total_lines += line_count;
627
628 let file = File::open(file_path).await.unwrap();
630 let reader = BufReader::new(file);
631 let mut lines = reader.lines();
632
633 let mut prev_timestamp: Option<u64> = None;
634 let mut line_number = 0;
635 let mut unsorted_count = 0;
636
637 while let Some(line) = lines.next_line().await.unwrap() {
639 line_number += 1;
640 let entry: RecordEntry<TestEvent> = serde_json::from_str(&line).unwrap();
641
642 if let Some(prev) = prev_timestamp {
643 if entry.timestamp < prev {
644 unsorted_count += 1;
645 if unsorted_count <= 5 {
646 println!(
648 "Timestamp order violation in file {} at line {}: {} < {}",
649 file_path.display(),
650 line_number,
651 entry.timestamp,
652 prev
653 );
654 }
655 }
656 }
657
658 prev_timestamp = Some(entry.timestamp);
659 }
660
661 assert_eq!(
662 unsorted_count, 0,
663 "Timestamps should be weakly sorted within each file"
664 );
665 }
666
667 assert_eq!(
668 total_lines, NUM_EVENTS,
669 "Total lines across all files should match NUM_EVENTS"
670 );
671
672 println!("Load test with file rotation completed successfully");
673 }
674}