1use rusqlite::{params, Connection, Result};
2use sha2::{Digest, Sha256};
3use std::time::{SystemTime, UNIX_EPOCH};
4use uuid::Uuid;
5
6#[derive(Debug, Clone, Copy)]
7pub struct IntegrityReport {
8 pub total_events: i64,
9 pub missing_or_invalid_hashes: i64,
10 pub hash_mismatches: i64,
11 pub orphan_chunks: i64,
12}
13
14#[derive(Debug, Clone)]
15pub struct AppendReceipt {
16 pub id: String,
17 pub timestamp: i64,
18 pub ingested_at: i64,
19 pub content_hash: String,
20}
21
22fn unix_now_secs() -> i64 {
23 SystemTime::now()
24 .duration_since(UNIX_EPOCH)
25 .unwrap()
26 .as_secs() as i64
27}
28
29pub fn append(
31 conn: &Connection,
32 source: &str,
33 content: &str,
34 meta: Option<&str>,
35) -> Result<String> {
36 Ok(append_with_receipt(conn, source, content, meta)?.id)
37}
38
39pub fn append_with_receipt(
41 conn: &Connection,
42 source: &str,
43 content: &str,
44 meta: Option<&str>,
45) -> Result<AppendReceipt> {
46 let id = Uuid::new_v4().to_string();
47 let now = unix_now_secs();
48 let last_ts: Option<i64> =
49 conn.query_row("SELECT MAX(timestamp) FROM events", [], |row| row.get(0))?;
50 let timestamp = match last_ts {
51 Some(last) if now <= last => last + 1,
52 _ => now,
53 };
54 let ingested_at = timestamp;
55
56 let mut hasher = Sha256::new();
58 hasher.update(content.as_bytes());
59 let content_hash = format!("{:x}", hasher.finalize());
60
61 conn.execute(
62 "INSERT INTO events (id, timestamp, source, content, meta, ingested_at, content_hash)
63 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7)",
64 params![
65 id,
66 timestamp,
67 source,
68 content,
69 meta,
70 ingested_at,
71 content_hash
72 ],
73 )?;
74
75 Ok(AppendReceipt {
76 id,
77 timestamp,
78 ingested_at,
79 content_hash,
80 })
81}
82
83pub fn append_batch(
85 conn: &Connection,
86 source: &str,
87 contents: &[&str],
88 meta: Option<&str>,
89) -> Result<Vec<String>> {
90 Ok(append_batch_with_receipts(conn, source, contents, meta)?
91 .into_iter()
92 .map(|receipt| receipt.id)
93 .collect())
94}
95
96pub fn append_batch_with_receipts(
98 conn: &Connection,
99 source: &str,
100 contents: &[&str],
101 meta: Option<&str>,
102) -> Result<Vec<AppendReceipt>> {
103 let mut receipts = Vec::new();
104
105 let tx = conn.unchecked_transaction()?;
107
108 let now = unix_now_secs();
109 let last_ts: Option<i64> =
110 tx.query_row("SELECT MAX(timestamp) FROM events", [], |row| row.get(0))?;
111 let mut timestamp = match last_ts {
112 Some(last) if now <= last => last + 1,
113 _ => now,
114 };
115
116 for content in contents {
117 let id = Uuid::new_v4().to_string();
118 let ingested_at = timestamp;
119
120 let mut hasher = Sha256::new();
122 hasher.update(content.as_bytes());
123 let content_hash = format!("{:x}", hasher.finalize());
124
125 tx.execute(
126 "INSERT INTO events (id, timestamp, source, content, meta, ingested_at, content_hash)
127 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7)",
128 params![
129 id,
130 timestamp,
131 source,
132 content,
133 meta,
134 ingested_at,
135 content_hash
136 ],
137 )?;
138
139 receipts.push(AppendReceipt {
140 id,
141 timestamp,
142 ingested_at,
143 content_hash,
144 });
145 timestamp += 1;
146 }
147
148 tx.commit()?;
149
150 Ok(receipts)
151}
152
153pub fn append_stdin(
155 conn: &Connection,
156 source: &str,
157 meta: Option<&str>,
158 batch_size: usize,
159) -> std::io::Result<Vec<String>> {
160 use std::io::{self, BufRead};
161
162 let stdin = io::stdin();
163 let mut all_ids = Vec::new();
164 let mut batch: Vec<String> = Vec::new();
165
166 for line in stdin.lock().lines() {
167 let line = line?;
168 let trimmed = line.trim();
169 if !trimmed.is_empty() {
170 batch.push(trimmed.to_string());
171 }
172
173 if batch.len() >= batch_size {
175 let contents: Vec<&str> = batch.iter().map(|s| s.as_str()).collect();
176 match append_batch(conn, source, &contents, meta) {
177 Ok(ids) => all_ids.extend(ids),
178 Err(e) => {
179 if !batch.is_empty() {
181 let contents: Vec<&str> = batch.iter().map(|s| s.as_str()).collect();
182 if let Ok(ids) = append_batch(conn, source, &contents, meta) {
183 all_ids.extend(ids);
184 }
185 }
186 return Err(io::Error::other(e));
187 }
188 }
189 batch.clear();
190 }
191 }
192
193 if !batch.is_empty() {
195 let contents: Vec<&str> = batch.iter().map(|s| s.as_str()).collect();
196 match append_batch(conn, source, &contents, meta) {
197 Ok(ids) => all_ids.extend(ids),
198 Err(e) => {
199 return Err(io::Error::other(e));
200 }
201 }
202 }
203
204 Ok(all_ids)
205}
206
207pub fn is_duplicate(conn: &Connection, content_hash: &str) -> Result<bool> {
209 let exists: bool = conn.query_row(
210 "SELECT EXISTS(SELECT 1 FROM events WHERE content_hash = ?1)",
211 [content_hash],
212 |row| row.get(0),
213 )?;
214
215 Ok(exists)
216}
217
218pub fn stats(conn: &Connection) -> Result<(i64, i64, i64, i64)> {
220 let total: i64 = conn.query_row("SELECT COUNT(*) FROM events", [], |row| row.get(0))?;
221 let unique: i64 = conn
222 .query_row(
223 "SELECT COUNT(DISTINCT content_hash) FROM events",
224 [],
225 |row| row.get(0),
226 )
227 .unwrap_or(0);
228 let oldest: i64 = conn
229 .query_row("SELECT MIN(timestamp) FROM events", [], |row| row.get(0))
230 .unwrap_or(0);
231 let newest: i64 = conn
232 .query_row("SELECT MAX(timestamp) FROM events", [], |row| row.get(0))
233 .unwrap_or(0);
234
235 Ok((total, unique, oldest, newest))
236}
237
238pub fn verify_integrity(conn: &Connection) -> Result<IntegrityReport> {
240 let total_events: i64 = conn.query_row("SELECT COUNT(*) FROM events", [], |row| row.get(0))?;
241
242 let missing_or_invalid_hashes: i64 = conn.query_row(
243 "SELECT COUNT(*) FROM events WHERE content_hash IS NULL OR length(content_hash) != 64",
244 [],
245 |row| row.get(0),
246 )?;
247
248 let mut hash_mismatches = 0_i64;
249 let mut stmt = conn.prepare("SELECT content, content_hash FROM events")?;
250 let rows = stmt.query_map([], |row| {
251 let content: String = row.get(0)?;
252 let content_hash: Option<String> = row.get(1)?;
253 Ok((content, content_hash))
254 })?;
255
256 for row in rows {
257 let (content, stored_hash) = row?;
258 let mut hasher = Sha256::new();
259 hasher.update(content.as_bytes());
260 let computed = format!("{:x}", hasher.finalize());
261
262 if stored_hash.as_deref() != Some(computed.as_str()) {
263 hash_mismatches += 1;
264 }
265 }
266
267 let orphan_chunks: i64 = conn.query_row(
268 "SELECT COUNT(*) FROM chunks c LEFT JOIN events e ON e.id = c.event_id WHERE e.id IS NULL",
269 [],
270 |row| row.get(0),
271 )?;
272
273 Ok(IntegrityReport {
274 total_events,
275 missing_or_invalid_hashes,
276 hash_mismatches,
277 orphan_chunks,
278 })
279}