Skip to main content

mirror_log/
log.rs

1use rusqlite::{params, Connection, Error as SqlError, Result};
2use sha2::{Digest, Sha256};
3use std::thread;
4use std::time::{SystemTime, UNIX_EPOCH};
5use uuid::Uuid;
6
7const BUSY_RETRY_ATTEMPTS: usize = 10;
8const BUSY_RETRY_DELAY_MS: u64 = 10;
9
10#[derive(Debug, Clone, Copy)]
11pub struct IntegrityReport {
12    pub total_events: i64,
13    pub missing_or_invalid_hashes: i64,
14    pub hash_mismatches: i64,
15    pub orphan_chunks: i64,
16}
17
18#[derive(Debug, Clone)]
19pub struct AppendReceipt {
20    pub id: String,
21    pub timestamp: i64,
22    pub ingested_at: i64,
23    pub content_hash: String,
24}
25
26fn unix_now_secs() -> i64 {
27    SystemTime::now()
28        .duration_since(UNIX_EPOCH)
29        .unwrap()
30        .as_secs() as i64
31}
32
33/// Append a single event to the log
34pub fn append(
35    conn: &Connection,
36    source: &str,
37    content: &str,
38    meta: Option<&str>,
39) -> Result<String> {
40    Ok(append_with_receipt(conn, source, content, meta)?.id)
41}
42
43/// Append a single event and return the canonical persistence receipt.
44pub fn append_with_receipt(
45    conn: &Connection,
46    source: &str,
47    content: &str,
48    meta: Option<&str>,
49) -> Result<AppendReceipt> {
50    with_busy_retry(|| {
51        let tx = conn.unchecked_transaction()?;
52        let receipt = append_with_receipt_in_tx(&tx, source, content, meta)?;
53        tx.commit()?;
54        Ok(receipt)
55    })
56}
57
58pub(crate) fn append_with_receipt_in_tx(
59    conn: &Connection,
60    source: &str,
61    content: &str,
62    meta: Option<&str>,
63) -> Result<AppendReceipt> {
64    let id = Uuid::new_v4().to_string();
65    let now = unix_now_secs();
66    let last_ts: Option<i64> =
67        conn.query_row("SELECT MAX(timestamp) FROM events", [], |row| row.get(0))?;
68    let timestamp = match last_ts {
69        Some(last) if now <= last => last + 1,
70        _ => now,
71    };
72    let ingested_at = timestamp;
73
74    // Calculate content hash for deduplication
75    let mut hasher = Sha256::new();
76    hasher.update(content.as_bytes());
77    let content_hash = format!("{:x}", hasher.finalize());
78
79    conn.execute(
80        "INSERT INTO events (id, timestamp, source, content, meta, ingested_at, content_hash)
81         VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7)",
82        params![
83            id,
84            timestamp,
85            source,
86            content,
87            meta,
88            ingested_at,
89            content_hash
90        ],
91    )?;
92
93    Ok(AppendReceipt {
94        id,
95        timestamp,
96        ingested_at,
97        content_hash,
98    })
99}
100
101/// Append multiple events from a batch in a single transaction
102pub fn append_batch(
103    conn: &Connection,
104    source: &str,
105    contents: &[&str],
106    meta: Option<&str>,
107) -> Result<Vec<String>> {
108    Ok(append_batch_with_receipts(conn, source, contents, meta)?
109        .into_iter()
110        .map(|receipt| receipt.id)
111        .collect())
112}
113
114/// Append multiple events and return a receipt for each inserted event.
115pub fn append_batch_with_receipts(
116    conn: &Connection,
117    source: &str,
118    contents: &[&str],
119    meta: Option<&str>,
120) -> Result<Vec<AppendReceipt>> {
121    with_busy_retry(|| {
122        let tx = conn.unchecked_transaction()?;
123        let receipts = append_batch_with_receipts_in_tx(&tx, source, contents, meta)?;
124        tx.commit()?;
125        Ok(receipts)
126    })
127}
128
129pub(crate) fn append_batch_with_receipts_in_tx(
130    conn: &Connection,
131    source: &str,
132    contents: &[&str],
133    meta: Option<&str>,
134) -> Result<Vec<AppendReceipt>> {
135    let mut receipts = Vec::with_capacity(contents.len());
136
137    let now = unix_now_secs();
138    let last_ts: Option<i64> =
139        conn.query_row("SELECT MAX(timestamp) FROM events", [], |row| row.get(0))?;
140    let mut timestamp = match last_ts {
141        Some(last) if now <= last => last + 1,
142        _ => now,
143    };
144
145    for content in contents {
146        let receipt = append_with_receipt_at_timestamp(conn, source, content, meta, timestamp)?;
147        receipts.push(receipt);
148        timestamp += 1;
149    }
150
151    Ok(receipts)
152}
153
154fn append_with_receipt_at_timestamp(
155    conn: &Connection,
156    source: &str,
157    content: &str,
158    meta: Option<&str>,
159    timestamp: i64,
160) -> Result<AppendReceipt> {
161    let id = Uuid::new_v4().to_string();
162    let ingested_at = timestamp;
163
164    let mut hasher = Sha256::new();
165    hasher.update(content.as_bytes());
166    let content_hash = format!("{:x}", hasher.finalize());
167
168    conn.execute(
169        "INSERT INTO events (id, timestamp, source, content, meta, ingested_at, content_hash)
170         VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7)",
171        params![
172            id,
173            timestamp,
174            source,
175            content,
176            meta,
177            ingested_at,
178            content_hash
179        ],
180    )?;
181
182    Ok(AppendReceipt {
183        id,
184        timestamp,
185        ingested_at,
186        content_hash,
187    })
188}
189
190fn with_busy_retry<T, F>(mut operation: F) -> Result<T>
191where
192    F: FnMut() -> Result<T>,
193{
194    let mut attempts = 0;
195
196    loop {
197        match operation() {
198            Ok(value) => return Ok(value),
199            Err(err) if is_busy_error(&err) && attempts < BUSY_RETRY_ATTEMPTS => {
200                attempts += 1;
201                thread::sleep(std::time::Duration::from_millis(BUSY_RETRY_DELAY_MS));
202            }
203            Err(err) => return Err(err),
204        }
205    }
206}
207
208fn is_busy_error(err: &SqlError) -> bool {
209    matches!(
210        err,
211        SqlError::SqliteFailure(code, _)
212            if matches!(
213                code.code,
214                rusqlite::ffi::ErrorCode::DatabaseBusy | rusqlite::ffi::ErrorCode::DatabaseLocked
215            )
216    )
217}
218
219/// Append events from stdin with configurable batch size
220pub fn append_stdin(
221    conn: &Connection,
222    source: &str,
223    meta: Option<&str>,
224    batch_size: usize,
225) -> std::io::Result<Vec<String>> {
226    use std::io::{self, BufRead};
227
228    let stdin = io::stdin();
229    let mut all_ids = Vec::new();
230    let mut batch: Vec<String> = Vec::new();
231    let effective_batch_size = batch_size.max(1);
232
233    for line in stdin.lock().lines() {
234        let line = line?;
235        let trimmed = line.trim();
236        if !trimmed.is_empty() {
237            batch.push(trimmed.to_string());
238        }
239
240        // Execute batch when we reach the configured size
241        if batch.len() >= effective_batch_size {
242            let contents: Vec<&str> = batch.iter().map(|s| s.as_str()).collect();
243            match append_batch(conn, source, &contents, meta) {
244                Ok(ids) => all_ids.extend(ids),
245                Err(e) => return Err(io::Error::other(e)),
246            }
247            batch.clear();
248        }
249    }
250
251    // Don't forget the last batch if it's non-empty
252    if !batch.is_empty() {
253        let contents: Vec<&str> = batch.iter().map(|s| s.as_str()).collect();
254        match append_batch(conn, source, &contents, meta) {
255            Ok(ids) => all_ids.extend(ids),
256            Err(e) => {
257                return Err(io::Error::other(e));
258            }
259        }
260    }
261
262    Ok(all_ids)
263}
264
265/// Check if an event with the same content hash already exists
266pub fn is_duplicate(conn: &Connection, content_hash: &str) -> Result<bool> {
267    let exists: bool = conn.query_row(
268        "SELECT EXISTS(SELECT 1 FROM events WHERE content_hash = ?1)",
269        [content_hash],
270        |row| row.get(0),
271    )?;
272
273    Ok(exists)
274}
275
276/// Get statistics about the events
277pub fn stats(conn: &Connection) -> Result<(i64, i64, i64, i64)> {
278    let total: i64 = conn.query_row(
279        "SELECT COUNT(*)
280         FROM events
281         WHERE NOT EXISTS (
282             SELECT 1 FROM shadow_state s WHERE s.event_id = events.id
283         )",
284        [],
285        |row| row.get(0),
286    )?;
287    let unique: i64 = conn
288        .query_row(
289            "SELECT COUNT(DISTINCT content_hash)
290             FROM events
291             WHERE NOT EXISTS (
292                 SELECT 1 FROM shadow_state s WHERE s.event_id = events.id
293             )",
294            [],
295            |row| row.get(0),
296        )
297        .unwrap_or(0);
298    let oldest: i64 = conn
299        .query_row(
300            "SELECT MIN(timestamp)
301             FROM events
302             WHERE NOT EXISTS (
303                 SELECT 1 FROM shadow_state s WHERE s.event_id = events.id
304             )",
305            [],
306            |row| row.get(0),
307        )
308        .unwrap_or(0);
309    let newest: i64 = conn
310        .query_row(
311            "SELECT MAX(timestamp)
312             FROM events
313             WHERE NOT EXISTS (
314                 SELECT 1 FROM shadow_state s WHERE s.event_id = events.id
315             )",
316            [],
317            |row| row.get(0),
318        )
319        .unwrap_or(0);
320
321    Ok((total, unique, oldest, newest))
322}
323
324/// Verify core integrity invariants for stored events and chunks.
325pub fn verify_integrity(conn: &Connection) -> Result<IntegrityReport> {
326    let total_events: i64 = conn.query_row("SELECT COUNT(*) FROM events", [], |row| row.get(0))?;
327
328    let missing_or_invalid_hashes: i64 = conn.query_row(
329        "SELECT COUNT(*) FROM events WHERE content_hash IS NULL OR length(content_hash) != 64",
330        [],
331        |row| row.get(0),
332    )?;
333
334    let mut hash_mismatches = 0_i64;
335    let mut stmt = conn.prepare("SELECT content, content_hash FROM events")?;
336    let rows = stmt.query_map([], |row| {
337        let content: String = row.get(0)?;
338        let content_hash: Option<String> = row.get(1)?;
339        Ok((content, content_hash))
340    })?;
341
342    for row in rows {
343        let (content, stored_hash) = row?;
344        let mut hasher = Sha256::new();
345        hasher.update(content.as_bytes());
346        let computed = format!("{:x}", hasher.finalize());
347
348        if stored_hash.as_deref() != Some(computed.as_str()) {
349            hash_mismatches += 1;
350        }
351    }
352
353    let orphan_chunks: i64 = conn.query_row(
354        "SELECT COUNT(*) FROM chunks c LEFT JOIN events e ON e.id = c.event_id WHERE e.id IS NULL",
355        [],
356        |row| row.get(0),
357    )?;
358
359    Ok(IntegrityReport {
360        total_events,
361        missing_or_invalid_hashes,
362        hash_mismatches,
363        orphan_chunks,
364    })
365}