1use rusqlite::{params, Connection, Error as SqlError, Result};
2use sha2::{Digest, Sha256};
3use std::thread;
4use std::time::{SystemTime, UNIX_EPOCH};
5use uuid::Uuid;
6
7const BUSY_RETRY_ATTEMPTS: usize = 10;
8const BUSY_RETRY_DELAY_MS: u64 = 10;
9
10#[derive(Debug, Clone, Copy)]
11pub struct IntegrityReport {
12 pub total_events: i64,
13 pub missing_or_invalid_hashes: i64,
14 pub hash_mismatches: i64,
15 pub orphan_chunks: i64,
16}
17
18#[derive(Debug, Clone)]
19pub struct AppendReceipt {
20 pub id: String,
21 pub timestamp: i64,
22 pub ingested_at: i64,
23 pub content_hash: String,
24}
25
26fn unix_now_secs() -> i64 {
27 SystemTime::now()
28 .duration_since(UNIX_EPOCH)
29 .unwrap()
30 .as_secs() as i64
31}
32
33pub fn append(
35 conn: &Connection,
36 source: &str,
37 content: &str,
38 meta: Option<&str>,
39) -> Result<String> {
40 Ok(append_with_receipt(conn, source, content, meta)?.id)
41}
42
43pub fn append_with_receipt(
45 conn: &Connection,
46 source: &str,
47 content: &str,
48 meta: Option<&str>,
49) -> Result<AppendReceipt> {
50 with_busy_retry(|| {
51 let tx = conn.unchecked_transaction()?;
52 let receipt = append_with_receipt_in_tx(&tx, source, content, meta)?;
53 tx.commit()?;
54 Ok(receipt)
55 })
56}
57
58pub(crate) fn append_with_receipt_in_tx(
59 conn: &Connection,
60 source: &str,
61 content: &str,
62 meta: Option<&str>,
63) -> Result<AppendReceipt> {
64 let id = Uuid::new_v4().to_string();
65 let now = unix_now_secs();
66 let last_ts: Option<i64> =
67 conn.query_row("SELECT MAX(timestamp) FROM events", [], |row| row.get(0))?;
68 let timestamp = match last_ts {
69 Some(last) if now <= last => last + 1,
70 _ => now,
71 };
72 let ingested_at = timestamp;
73
74 let mut hasher = Sha256::new();
76 hasher.update(content.as_bytes());
77 let content_hash = format!("{:x}", hasher.finalize());
78
79 conn.execute(
80 "INSERT INTO events (id, timestamp, source, content, meta, ingested_at, content_hash)
81 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7)",
82 params![
83 id,
84 timestamp,
85 source,
86 content,
87 meta,
88 ingested_at,
89 content_hash
90 ],
91 )?;
92
93 Ok(AppendReceipt {
94 id,
95 timestamp,
96 ingested_at,
97 content_hash,
98 })
99}
100
101pub fn append_batch(
103 conn: &Connection,
104 source: &str,
105 contents: &[&str],
106 meta: Option<&str>,
107) -> Result<Vec<String>> {
108 Ok(append_batch_with_receipts(conn, source, contents, meta)?
109 .into_iter()
110 .map(|receipt| receipt.id)
111 .collect())
112}
113
114pub fn append_batch_with_receipts(
116 conn: &Connection,
117 source: &str,
118 contents: &[&str],
119 meta: Option<&str>,
120) -> Result<Vec<AppendReceipt>> {
121 with_busy_retry(|| {
122 let tx = conn.unchecked_transaction()?;
123 let receipts = append_batch_with_receipts_in_tx(&tx, source, contents, meta)?;
124 tx.commit()?;
125 Ok(receipts)
126 })
127}
128
129pub(crate) fn append_batch_with_receipts_in_tx(
130 conn: &Connection,
131 source: &str,
132 contents: &[&str],
133 meta: Option<&str>,
134) -> Result<Vec<AppendReceipt>> {
135 let mut receipts = Vec::with_capacity(contents.len());
136
137 let now = unix_now_secs();
138 let last_ts: Option<i64> =
139 conn.query_row("SELECT MAX(timestamp) FROM events", [], |row| row.get(0))?;
140 let mut timestamp = match last_ts {
141 Some(last) if now <= last => last + 1,
142 _ => now,
143 };
144
145 for content in contents {
146 let receipt = append_with_receipt_at_timestamp(conn, source, content, meta, timestamp)?;
147 receipts.push(receipt);
148 timestamp += 1;
149 }
150
151 Ok(receipts)
152}
153
154fn append_with_receipt_at_timestamp(
155 conn: &Connection,
156 source: &str,
157 content: &str,
158 meta: Option<&str>,
159 timestamp: i64,
160) -> Result<AppendReceipt> {
161 let id = Uuid::new_v4().to_string();
162 let ingested_at = timestamp;
163
164 let mut hasher = Sha256::new();
165 hasher.update(content.as_bytes());
166 let content_hash = format!("{:x}", hasher.finalize());
167
168 conn.execute(
169 "INSERT INTO events (id, timestamp, source, content, meta, ingested_at, content_hash)
170 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7)",
171 params![
172 id,
173 timestamp,
174 source,
175 content,
176 meta,
177 ingested_at,
178 content_hash
179 ],
180 )?;
181
182 Ok(AppendReceipt {
183 id,
184 timestamp,
185 ingested_at,
186 content_hash,
187 })
188}
189
190fn with_busy_retry<T, F>(mut operation: F) -> Result<T>
191where
192 F: FnMut() -> Result<T>,
193{
194 let mut attempts = 0;
195
196 loop {
197 match operation() {
198 Ok(value) => return Ok(value),
199 Err(err) if is_busy_error(&err) && attempts < BUSY_RETRY_ATTEMPTS => {
200 attempts += 1;
201 thread::sleep(std::time::Duration::from_millis(BUSY_RETRY_DELAY_MS));
202 }
203 Err(err) => return Err(err),
204 }
205 }
206}
207
208fn is_busy_error(err: &SqlError) -> bool {
209 matches!(
210 err,
211 SqlError::SqliteFailure(code, _)
212 if matches!(
213 code.code,
214 rusqlite::ffi::ErrorCode::DatabaseBusy | rusqlite::ffi::ErrorCode::DatabaseLocked
215 )
216 )
217}
218
219pub fn append_stdin(
221 conn: &Connection,
222 source: &str,
223 meta: Option<&str>,
224 batch_size: usize,
225) -> std::io::Result<Vec<String>> {
226 use std::io::{self, BufRead};
227
228 let stdin = io::stdin();
229 let mut all_ids = Vec::new();
230 let mut batch: Vec<String> = Vec::new();
231 let effective_batch_size = batch_size.max(1);
232
233 for line in stdin.lock().lines() {
234 let line = line?;
235 let trimmed = line.trim();
236 if !trimmed.is_empty() {
237 batch.push(trimmed.to_string());
238 }
239
240 if batch.len() >= effective_batch_size {
242 let contents: Vec<&str> = batch.iter().map(|s| s.as_str()).collect();
243 match append_batch(conn, source, &contents, meta) {
244 Ok(ids) => all_ids.extend(ids),
245 Err(e) => return Err(io::Error::other(e)),
246 }
247 batch.clear();
248 }
249 }
250
251 if !batch.is_empty() {
253 let contents: Vec<&str> = batch.iter().map(|s| s.as_str()).collect();
254 match append_batch(conn, source, &contents, meta) {
255 Ok(ids) => all_ids.extend(ids),
256 Err(e) => {
257 return Err(io::Error::other(e));
258 }
259 }
260 }
261
262 Ok(all_ids)
263}
264
265pub fn is_duplicate(conn: &Connection, content_hash: &str) -> Result<bool> {
267 let exists: bool = conn.query_row(
268 "SELECT EXISTS(SELECT 1 FROM events WHERE content_hash = ?1)",
269 [content_hash],
270 |row| row.get(0),
271 )?;
272
273 Ok(exists)
274}
275
276pub fn stats(conn: &Connection) -> Result<(i64, i64, i64, i64)> {
278 let total: i64 = conn.query_row(
279 "SELECT COUNT(*)
280 FROM events
281 WHERE NOT EXISTS (
282 SELECT 1 FROM shadow_state s WHERE s.event_id = events.id
283 )",
284 [],
285 |row| row.get(0),
286 )?;
287 let unique: i64 = conn
288 .query_row(
289 "SELECT COUNT(DISTINCT content_hash)
290 FROM events
291 WHERE NOT EXISTS (
292 SELECT 1 FROM shadow_state s WHERE s.event_id = events.id
293 )",
294 [],
295 |row| row.get(0),
296 )
297 .unwrap_or(0);
298 let oldest: i64 = conn
299 .query_row(
300 "SELECT MIN(timestamp)
301 FROM events
302 WHERE NOT EXISTS (
303 SELECT 1 FROM shadow_state s WHERE s.event_id = events.id
304 )",
305 [],
306 |row| row.get(0),
307 )
308 .unwrap_or(0);
309 let newest: i64 = conn
310 .query_row(
311 "SELECT MAX(timestamp)
312 FROM events
313 WHERE NOT EXISTS (
314 SELECT 1 FROM shadow_state s WHERE s.event_id = events.id
315 )",
316 [],
317 |row| row.get(0),
318 )
319 .unwrap_or(0);
320
321 Ok((total, unique, oldest, newest))
322}
323
324pub fn verify_integrity(conn: &Connection) -> Result<IntegrityReport> {
326 let total_events: i64 = conn.query_row("SELECT COUNT(*) FROM events", [], |row| row.get(0))?;
327
328 let missing_or_invalid_hashes: i64 = conn.query_row(
329 "SELECT COUNT(*) FROM events WHERE content_hash IS NULL OR length(content_hash) != 64",
330 [],
331 |row| row.get(0),
332 )?;
333
334 let mut hash_mismatches = 0_i64;
335 let mut stmt = conn.prepare("SELECT content, content_hash FROM events")?;
336 let rows = stmt.query_map([], |row| {
337 let content: String = row.get(0)?;
338 let content_hash: Option<String> = row.get(1)?;
339 Ok((content, content_hash))
340 })?;
341
342 for row in rows {
343 let (content, stored_hash) = row?;
344 let mut hasher = Sha256::new();
345 hasher.update(content.as_bytes());
346 let computed = format!("{:x}", hasher.finalize());
347
348 if stored_hash.as_deref() != Some(computed.as_str()) {
349 hash_mismatches += 1;
350 }
351 }
352
353 let orphan_chunks: i64 = conn.query_row(
354 "SELECT COUNT(*) FROM chunks c LEFT JOIN events e ON e.id = c.event_id WHERE e.id IS NULL",
355 [],
356 |row| row.get(0),
357 )?;
358
359 Ok(IntegrityReport {
360 total_events,
361 missing_or_invalid_hashes,
362 hash_mismatches,
363 orphan_chunks,
364 })
365}