Skip to main content

rivet/state/
file_log.rs

1use crate::error::Result;
2
3use super::{StateConn, StateStore, pg_sql};
4
5/// One row from `file_log` (formerly `file_manifest`; renamed in schema v8).
6#[derive(Debug, serde::Serialize)]
7#[allow(dead_code)]
8pub struct FileRecord {
9    pub run_id: String,
10    pub export_name: String,
11    pub file_name: String,
12    pub row_count: i64,
13    pub bytes: i64,
14    pub format: String,
15    pub compression: Option<String>,
16    pub created_at: String,
17}
18
19/// File log store — reads and writes `file_log`.
20///
21/// Historical note: this table was named `file_manifest` prior to schema v8.
22/// The name was reclaimed for the 0.7.0 cloud-output JSON manifest contract;
23/// the internal SQLite log was renamed to `file_log` to remove the overload.
24///
25/// Invariant I2 (Write Before Log) governs when `record_file` is called:
26/// only after a destination write succeeds.  Failed writes produce no log entry.
27/// Invariant I7 (File-Log Failure Is Non-Fatal) means callers use `let _ = record_file(...)`.
28impl StateStore {
29    #[allow(clippy::too_many_arguments)]
30    pub fn record_file(
31        &self,
32        run_id: &str,
33        export_name: &str,
34        file_name: &str,
35        row_count: i64,
36        bytes: i64,
37        format: &str,
38        compression: Option<&str>,
39    ) -> Result<()> {
40        let now = chrono::Utc::now().to_rfc3339();
41        let sql = "INSERT INTO file_log (run_id, export_name, file_name, row_count, bytes, format, compression, created_at)
42             VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8)";
43        match &self.conn {
44            StateConn::Sqlite(c) => {
45                c.execute(
46                    sql,
47                    rusqlite::params![
48                        run_id,
49                        export_name,
50                        file_name,
51                        row_count,
52                        bytes,
53                        format,
54                        compression,
55                        now
56                    ],
57                )?;
58            }
59            StateConn::Postgres(client) => {
60                let mut c = client.borrow_mut();
61                c.execute(
62                    &pg_sql(sql),
63                    &[
64                        &run_id,
65                        &export_name,
66                        &file_name,
67                        &row_count,
68                        &bytes,
69                        &format,
70                        &compression,
71                        &now,
72                    ],
73                )?;
74            }
75        }
76        Ok(())
77    }
78
79    pub fn get_files(&self, export_name: Option<&str>, limit: usize) -> Result<Vec<FileRecord>> {
80        let cols =
81            "run_id, export_name, file_name, row_count, bytes, format, compression, created_at";
82        let limit_i64 = limit as i64;
83        match &self.conn {
84            StateConn::Sqlite(c) => {
85                let (sql, params): (&str, Vec<Box<dyn rusqlite::types::ToSql>>) = if let Some(
86                    name,
87                ) = export_name
88                {
89                    (
90                        "SELECT run_id, export_name, file_name, row_count, bytes, format, compression, created_at \
91                             FROM file_log WHERE export_name = ?1 ORDER BY id DESC LIMIT ?2",
92                        vec![Box::new(name.to_string()), Box::new(limit_i64)],
93                    )
94                } else {
95                    (
96                        "SELECT run_id, export_name, file_name, row_count, bytes, format, compression, created_at \
97                             FROM file_log ORDER BY id DESC LIMIT ?1",
98                        vec![Box::new(limit_i64)],
99                    )
100                };
101                let mut stmt = c.prepare(sql)?;
102                let params_refs: Vec<&dyn rusqlite::types::ToSql> =
103                    params.iter().map(|p| p.as_ref()).collect();
104                let rows = stmt.query_map(params_refs.as_slice(), |row| {
105                    Ok(FileRecord {
106                        run_id: row.get(0)?,
107                        export_name: row.get(1)?,
108                        file_name: row.get(2)?,
109                        row_count: row.get(3)?,
110                        bytes: row.get(4)?,
111                        format: row.get(5)?,
112                        compression: row.get(6)?,
113                        created_at: row.get(7)?,
114                    })
115                })?;
116                rows.collect::<std::result::Result<Vec<_>, _>>()
117                    .map_err(Into::into)
118            }
119            StateConn::Postgres(client) => {
120                // Single borrow for the duration of this call; safe because all Postgres
121                // operations in StateStore are sequential (no re-entrant borrows).
122                let mut c = client.borrow_mut();
123                let rows = if let Some(name) = export_name {
124                    c.query(
125                        &format!("SELECT {} FROM file_log WHERE export_name = $1 ORDER BY id DESC LIMIT $2", cols),
126                        &[&name, &limit_i64],
127                    )?
128                } else {
129                    c.query(
130                        &format!("SELECT {} FROM file_log ORDER BY id DESC LIMIT $1", cols),
131                        &[&limit_i64],
132                    )?
133                };
134                Ok(rows
135                    .iter()
136                    .map(|row| FileRecord {
137                        run_id: row.get(0),
138                        export_name: row.get(1),
139                        file_name: row.get(2),
140                        row_count: row.get(3),
141                        bytes: row.get(4),
142                        format: row.get(5),
143                        compression: row.get(6),
144                        created_at: row.get(7),
145                    })
146                    .collect())
147            }
148        }
149    }
150}
151
152#[cfg(test)]
153mod tests {
154    use super::*;
155
156    fn store() -> StateStore {
157        StateStore::open_in_memory().expect("in-memory store")
158    }
159
160    #[test]
161    fn record_and_query_files() {
162        let s = store();
163        s.record_file(
164            "run_001",
165            "orders",
166            "orders_20260329.parquet",
167            50000,
168            4096,
169            "parquet",
170            Some("zstd"),
171        )
172        .unwrap();
173        s.record_file(
174            "run_001",
175            "orders",
176            "orders_20260329_chunk1.parquet",
177            25000,
178            2048,
179            "parquet",
180            Some("zstd"),
181        )
182        .unwrap();
183        s.record_file(
184            "run_002",
185            "users",
186            "users_20260329.csv",
187            1000,
188            500,
189            "csv",
190            None,
191        )
192        .unwrap();
193
194        let files = s.get_files(Some("orders"), 10).unwrap();
195        assert_eq!(files.len(), 2);
196        assert_eq!(files[0].run_id, "run_001");
197        assert_eq!(files[0].row_count, 25000);
198
199        let all = s.get_files(None, 10).unwrap();
200        assert_eq!(all.len(), 3);
201    }
202
203    #[test]
204    fn files_limit_works() {
205        let s = store();
206        for i in 0..10 {
207            s.record_file(
208                &format!("r{}", i),
209                "t",
210                &format!("f{}.parquet", i),
211                i,
212                i * 100,
213                "parquet",
214                None,
215            )
216            .unwrap();
217        }
218        let files = s.get_files(Some("t"), 3).unwrap();
219        assert_eq!(files.len(), 3);
220    }
221}