tsink 0.10.2

A lightweight embedded time-series database with a straightforward API
Documentation
use std::sync::{Arc, Barrier};
use std::thread;
use std::time::Duration;
use tempfile::TempDir;
use tsink::{DataPoint, Label, Row, StorageBuilder};

#[test]
fn test_high_contention_concurrent_writes() {
    let temp_dir = TempDir::new().unwrap();
    let write_timeout = if cfg!(windows) {
        Duration::from_secs(60)
    } else {
        Duration::from_secs(10)
    };
    let storage = Arc::new(
        StorageBuilder::new()
            .with_data_path(temp_dir.path())
            .with_max_writers(4)
            .with_write_timeout(write_timeout)
            .build()
            .unwrap(),
    );

    let num_threads = 20;
    let writes_per_thread = 100;
    let barrier = Arc::new(Barrier::new(num_threads));
    let mut handles = vec![];

    for thread_id in 0..num_threads {
        let storage = storage.clone();
        let barrier = barrier.clone();
        let handle = thread::spawn(move || {
            let rows = (0..writes_per_thread)
                .map(|i| {
                    let timestamp = (thread_id * writes_per_thread + i + 1) as i64;
                    Row::new(
                        "contention_metric",
                        DataPoint::new(timestamp, thread_id as f64 + i as f64 / 100.0),
                    )
                })
                .collect::<Vec<_>>();
            barrier.wait();
            storage.insert_rows(&rows).unwrap();
        });
        handles.push(handle);
    }

    for handle in handles {
        handle.join().unwrap();
    }

    let points = storage
        .select("contention_metric", &[], 1, i64::MAX)
        .unwrap();
    assert_eq!(
        points.len(),
        num_threads * writes_per_thread,
        "Some writes were lost under contention"
    );
}

#[test]
fn test_concurrent_reads_during_writes() {
    let temp_dir = TempDir::new().unwrap();
    let storage = Arc::new(
        StorageBuilder::new()
            .with_data_path(temp_dir.path())
            .build()
            .unwrap(),
    );

    let storage_writer = storage.clone();
    let writer = thread::spawn(move || {
        for i in 0..1000 {
            let rows = vec![Row::new(
                "concurrent_metric",
                DataPoint::new((i + 1) as i64 * 100, i as f64),
            )];
            storage_writer.insert_rows(&rows).unwrap();
            if i % 10 == 0 {
                thread::sleep(Duration::from_micros(100));
            }
        }
    });

    let mut readers = vec![];
    for reader_id in 0..5 {
        let storage = storage.clone();
        let handle = thread::spawn(move || {
            let mut last_count = 0;
            for _ in 0..20 {
                thread::sleep(Duration::from_millis(5));
                let points = storage
                    .select("concurrent_metric", &[], 1, i64::MAX)
                    .unwrap();
                assert!(
                    points.len() >= last_count,
                    "Reader {} saw decreasing points: {} -> {}",
                    reader_id,
                    last_count,
                    points.len()
                );
                last_count = points.len();
            }
            last_count
        });
        readers.push(handle);
    }

    writer.join().unwrap();
    for handle in readers {
        handle.join().unwrap();
    }
}

#[test]
fn test_concurrent_different_metrics() {
    let temp_dir = TempDir::new().unwrap();
    let storage = Arc::new(
        StorageBuilder::new()
            .with_data_path(temp_dir.path())
            .build()
            .unwrap(),
    );

    let mut handles = vec![];

    for thread_id in 0..10 {
        let storage = storage.clone();
        let handle = thread::spawn(move || {
            let metric_name = format!("metric_{}", thread_id);
            for i in 0..100 {
                let rows = vec![Row::new(
                    &metric_name,
                    DataPoint::new((i + 1) as i64, thread_id as f64 * 100.0 + i as f64),
                )];
                storage.insert_rows(&rows).unwrap();
            }
        });
        handles.push(handle);
    }

    for handle in handles {
        handle.join().unwrap();
    }

    for thread_id in 0..10 {
        let metric_name = format!("metric_{}", thread_id);
        let points = storage.select(&metric_name, &[], 1, i64::MAX).unwrap();
        assert_eq!(points.len(), 100, "Metric {} has wrong count", metric_name);
    }
}

#[test]
fn test_concurrent_labeled_metrics() {
    let temp_dir = TempDir::new().unwrap();
    let storage = Arc::new(
        StorageBuilder::new()
            .with_data_path(temp_dir.path())
            .build()
            .unwrap(),
    );

    let mut handles = vec![];

    for thread_id in 0..8 {
        let storage = storage.clone();
        let handle = thread::spawn(move || {
            let labels = vec![
                Label::new("thread", thread_id.to_string()),
                Label::new("type", if thread_id % 2 == 0 { "even" } else { "odd" }),
            ];

            for i in 0..50 {
                let rows = vec![Row::with_labels(
                    "labeled_metric",
                    labels.clone(),
                    DataPoint::new((i + 1) as i64, thread_id as f64 * 10.0 + i as f64),
                )];
                storage.insert_rows(&rows).unwrap();
            }
        });
        handles.push(handle);
    }

    for handle in handles {
        handle.join().unwrap();
    }

    for thread_id in 0..8 {
        let labels = vec![
            Label::new("thread", thread_id.to_string()),
            Label::new("type", if thread_id % 2 == 0 { "even" } else { "odd" }),
        ];
        let points = storage
            .select("labeled_metric", &labels, 1, i64::MAX)
            .unwrap();
        assert_eq!(
            points.len(),
            50,
            "Thread {} labeled metric has wrong count",
            thread_id
        );
    }
}