Skip to main content

vantage_log_writer/
log_writer.rs

1use std::path::PathBuf;
2use std::sync::Arc;
3
4use tokio::sync::mpsc;
5
6use crate::writer_task::{WRITE_QUEUE_CAPACITY, WriteOp, spawn};
7
8/// Append-only data source that writes JSONL records to files in `base_dir`.
9///
10/// Each table maps to one file: `{base_dir}/{table_name}.jsonl`. Inserts are
11/// queued on a background tokio task and the call returns as soon as the
12/// message lands on the channel — no fsync, no crash safety. Cloning shares
13/// the same channel and worker.
14#[derive(Clone)]
15pub struct LogWriter {
16    inner: Arc<Inner>,
17}
18
19struct Inner {
20    base_dir: PathBuf,
21    id_column: String,
22    tx: mpsc::Sender<WriteOp>,
23}
24
25impl LogWriter {
26    pub fn new(base_dir: impl Into<PathBuf>) -> Self {
27        let (tx, rx) = mpsc::channel::<WriteOp>(WRITE_QUEUE_CAPACITY);
28        spawn(rx);
29        Self {
30            inner: Arc::new(Inner {
31                base_dir: base_dir.into(),
32                id_column: "id".to_string(),
33                tx,
34            }),
35        }
36    }
37
38    pub fn with_id_column(self, column: impl Into<String>) -> Self {
39        Self {
40            inner: Arc::new(Inner {
41                base_dir: self.inner.base_dir.clone(),
42                id_column: column.into(),
43                tx: self.inner.tx.clone(),
44            }),
45        }
46    }
47
48    pub fn base_dir(&self) -> &std::path::Path {
49        &self.inner.base_dir
50    }
51
52    pub fn id_column(&self) -> &str {
53        &self.inner.id_column
54    }
55
56    pub(crate) fn file_path(&self, table_name: &str) -> PathBuf {
57        self.inner.base_dir.join(format!("{}.jsonl", table_name))
58    }
59
60    pub(crate) fn sender(&self) -> &mpsc::Sender<WriteOp> {
61        &self.inner.tx
62    }
63}
64
65impl std::fmt::Debug for LogWriter {
66    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
67        f.debug_struct("LogWriter")
68            .field("base_dir", &self.inner.base_dir)
69            .field("id_column", &self.inner.id_column)
70            .finish()
71    }
72}