Skip to main content

mirror_log/
log.rs

1use rusqlite::{params, Connection, Result};
2use sha2::{Digest, Sha256};
3use std::time::{SystemTime, UNIX_EPOCH};
4use uuid::Uuid;
5
6#[derive(Debug, Clone, Copy)]
7pub struct IntegrityReport {
8    pub total_events: i64,
9    pub missing_or_invalid_hashes: i64,
10    pub hash_mismatches: i64,
11    pub orphan_chunks: i64,
12}
13
14#[derive(Debug, Clone)]
15pub struct AppendReceipt {
16    pub id: String,
17    pub timestamp: i64,
18    pub ingested_at: i64,
19    pub content_hash: String,
20}
21
22fn unix_now_secs() -> i64 {
23    SystemTime::now()
24        .duration_since(UNIX_EPOCH)
25        .unwrap()
26        .as_secs() as i64
27}
28
29/// Append a single event to the log
30pub fn append(
31    conn: &Connection,
32    source: &str,
33    content: &str,
34    meta: Option<&str>,
35) -> Result<String> {
36    Ok(append_with_receipt(conn, source, content, meta)?.id)
37}
38
39/// Append a single event and return the canonical persistence receipt.
40pub fn append_with_receipt(
41    conn: &Connection,
42    source: &str,
43    content: &str,
44    meta: Option<&str>,
45) -> Result<AppendReceipt> {
46    let id = Uuid::new_v4().to_string();
47    let now = unix_now_secs();
48    let last_ts: Option<i64> =
49        conn.query_row("SELECT MAX(timestamp) FROM events", [], |row| row.get(0))?;
50    let timestamp = match last_ts {
51        Some(last) if now <= last => last + 1,
52        _ => now,
53    };
54    let ingested_at = timestamp;
55
56    // Calculate content hash for deduplication
57    let mut hasher = Sha256::new();
58    hasher.update(content.as_bytes());
59    let content_hash = format!("{:x}", hasher.finalize());
60
61    conn.execute(
62        "INSERT INTO events (id, timestamp, source, content, meta, ingested_at, content_hash)
63         VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7)",
64        params![
65            id,
66            timestamp,
67            source,
68            content,
69            meta,
70            ingested_at,
71            content_hash
72        ],
73    )?;
74
75    Ok(AppendReceipt {
76        id,
77        timestamp,
78        ingested_at,
79        content_hash,
80    })
81}
82
83/// Append multiple events from a batch in a single transaction
84pub fn append_batch(
85    conn: &Connection,
86    source: &str,
87    contents: &[&str],
88    meta: Option<&str>,
89) -> Result<Vec<String>> {
90    Ok(append_batch_with_receipts(conn, source, contents, meta)?
91        .into_iter()
92        .map(|receipt| receipt.id)
93        .collect())
94}
95
96/// Append multiple events and return a receipt for each inserted event.
97pub fn append_batch_with_receipts(
98    conn: &Connection,
99    source: &str,
100    contents: &[&str],
101    meta: Option<&str>,
102) -> Result<Vec<AppendReceipt>> {
103    let mut receipts = Vec::new();
104
105    // Wrap all inserts in a single transaction for atomicity
106    let tx = conn.unchecked_transaction()?;
107
108    let now = unix_now_secs();
109    let last_ts: Option<i64> =
110        tx.query_row("SELECT MAX(timestamp) FROM events", [], |row| row.get(0))?;
111    let mut timestamp = match last_ts {
112        Some(last) if now <= last => last + 1,
113        _ => now,
114    };
115
116    for content in contents {
117        let id = Uuid::new_v4().to_string();
118        let ingested_at = timestamp;
119
120        // Calculate content hash
121        let mut hasher = Sha256::new();
122        hasher.update(content.as_bytes());
123        let content_hash = format!("{:x}", hasher.finalize());
124
125        tx.execute(
126            "INSERT INTO events (id, timestamp, source, content, meta, ingested_at, content_hash)
127             VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7)",
128            params![
129                id,
130                timestamp,
131                source,
132                content,
133                meta,
134                ingested_at,
135                content_hash
136            ],
137        )?;
138
139        receipts.push(AppendReceipt {
140            id,
141            timestamp,
142            ingested_at,
143            content_hash,
144        });
145        timestamp += 1;
146    }
147
148    tx.commit()?;
149
150    Ok(receipts)
151}
152
153/// Append events from stdin with configurable batch size
154pub fn append_stdin(
155    conn: &Connection,
156    source: &str,
157    meta: Option<&str>,
158    batch_size: usize,
159) -> std::io::Result<Vec<String>> {
160    use std::io::{self, BufRead};
161
162    let stdin = io::stdin();
163    let mut all_ids = Vec::new();
164    let mut batch: Vec<String> = Vec::new();
165
166    for line in stdin.lock().lines() {
167        let line = line?;
168        let trimmed = line.trim();
169        if !trimmed.is_empty() {
170            batch.push(trimmed.to_string());
171        }
172
173        // Execute batch when we reach the configured size
174        if batch.len() >= batch_size {
175            let contents: Vec<&str> = batch.iter().map(|s| s.as_str()).collect();
176            match append_batch(conn, source, &contents, meta) {
177                Ok(ids) => all_ids.extend(ids),
178                Err(e) => {
179                    // If we fail, flush the remaining batch and return error
180                    if !batch.is_empty() {
181                        let contents: Vec<&str> = batch.iter().map(|s| s.as_str()).collect();
182                        if let Ok(ids) = append_batch(conn, source, &contents, meta) {
183                            all_ids.extend(ids);
184                        }
185                    }
186                    return Err(io::Error::other(e));
187                }
188            }
189            batch.clear();
190        }
191    }
192
193    // Don't forget the last batch if it's non-empty
194    if !batch.is_empty() {
195        let contents: Vec<&str> = batch.iter().map(|s| s.as_str()).collect();
196        match append_batch(conn, source, &contents, meta) {
197            Ok(ids) => all_ids.extend(ids),
198            Err(e) => {
199                return Err(io::Error::other(e));
200            }
201        }
202    }
203
204    Ok(all_ids)
205}
206
207/// Check if an event with the same content hash already exists
208pub fn is_duplicate(conn: &Connection, content_hash: &str) -> Result<bool> {
209    let exists: bool = conn.query_row(
210        "SELECT EXISTS(SELECT 1 FROM events WHERE content_hash = ?1)",
211        [content_hash],
212        |row| row.get(0),
213    )?;
214
215    Ok(exists)
216}
217
218/// Get statistics about the events
219pub fn stats(conn: &Connection) -> Result<(i64, i64, i64, i64)> {
220    let total: i64 = conn.query_row("SELECT COUNT(*) FROM events", [], |row| row.get(0))?;
221    let unique: i64 = conn
222        .query_row(
223            "SELECT COUNT(DISTINCT content_hash) FROM events",
224            [],
225            |row| row.get(0),
226        )
227        .unwrap_or(0);
228    let oldest: i64 = conn
229        .query_row("SELECT MIN(timestamp) FROM events", [], |row| row.get(0))
230        .unwrap_or(0);
231    let newest: i64 = conn
232        .query_row("SELECT MAX(timestamp) FROM events", [], |row| row.get(0))
233        .unwrap_or(0);
234
235    Ok((total, unique, oldest, newest))
236}
237
238/// Verify core integrity invariants for stored events and chunks.
239pub fn verify_integrity(conn: &Connection) -> Result<IntegrityReport> {
240    let total_events: i64 = conn.query_row("SELECT COUNT(*) FROM events", [], |row| row.get(0))?;
241
242    let missing_or_invalid_hashes: i64 = conn.query_row(
243        "SELECT COUNT(*) FROM events WHERE content_hash IS NULL OR length(content_hash) != 64",
244        [],
245        |row| row.get(0),
246    )?;
247
248    let mut hash_mismatches = 0_i64;
249    let mut stmt = conn.prepare("SELECT content, content_hash FROM events")?;
250    let rows = stmt.query_map([], |row| {
251        let content: String = row.get(0)?;
252        let content_hash: Option<String> = row.get(1)?;
253        Ok((content, content_hash))
254    })?;
255
256    for row in rows {
257        let (content, stored_hash) = row?;
258        let mut hasher = Sha256::new();
259        hasher.update(content.as_bytes());
260        let computed = format!("{:x}", hasher.finalize());
261
262        if stored_hash.as_deref() != Some(computed.as_str()) {
263            hash_mismatches += 1;
264        }
265    }
266
267    let orphan_chunks: i64 = conn.query_row(
268        "SELECT COUNT(*) FROM chunks c LEFT JOIN events e ON e.id = c.event_id WHERE e.id IS NULL",
269        [],
270        |row| row.get(0),
271    )?;
272
273    Ok(IntegrityReport {
274        total_events,
275        missing_or_invalid_hashes,
276        hash_mismatches,
277        orphan_chunks,
278    })
279}