1use chrono::{DateTime, TimeZone, Utc};
2use rusqlite::{Connection, Result};
3
4#[derive(Debug)]
5pub struct Event {
6 pub id: String,
7 pub timestamp: i64,
8 pub source: String,
9 pub content: String,
10 pub meta: Option<String>,
11 pub ingested_at: i64,
12 pub content_hash: Option<String>,
13}
14
15impl Event {
16 pub fn format_time(&self) -> String {
17 let dt: DateTime<Utc> = Utc.timestamp_opt(self.timestamp, 0).unwrap();
18 dt.format("%Y-%m-%d %H:%M:%S UTC").to_string()
19 }
20
21 pub fn format_ingested_at(&self) -> String {
22 let dt: DateTime<Utc> = Utc.timestamp_opt(self.ingested_at, 0).unwrap();
23 dt.format("%Y-%m-%d %H:%M:%S UTC").to_string()
24 }
25
26 pub fn preview_content(&self, max_chars: usize) -> String {
27 let total_chars = self.content.chars().count();
28 if total_chars <= max_chars {
29 self.content.clone()
30 } else {
31 let preview: String = self.content.chars().take(max_chars).collect();
32 format!(
33 "{}...\n\n[Content truncated: {} of {} chars shown]",
34 preview, max_chars, total_chars
35 )
36 }
37 }
38}
39
40pub fn recent(conn: &Connection, limit: i64) -> Result<Vec<Event>> {
41 let mut stmt = conn.prepare(
42 "SELECT id, timestamp, source, content, meta, ingested_at, content_hash
43 FROM events
44 WHERE NOT EXISTS (
45 SELECT 1 FROM shadow_state s WHERE s.event_id = events.id
46 )
47 ORDER BY timestamp DESC
48 LIMIT ?1",
49 )?;
50
51 let rows = stmt.query_map([limit], |row| {
52 Ok(Event {
53 id: row.get(0)?,
54 timestamp: row.get(1)?,
55 source: row.get(2)?,
56 content: row.get(3)?,
57 meta: row.get(4)?,
58 ingested_at: row.get(5)?,
59 content_hash: row.get(6)?,
60 })
61 })?;
62
63 rows.collect()
64}
65
66pub fn search(conn: &Connection, term: &str) -> Result<Vec<Event>> {
67 let like = format!("%{}%", term);
68
69 let mut stmt = conn.prepare(
70 "SELECT id, timestamp, source, content, meta, ingested_at, content_hash
71 FROM events
72 WHERE content LIKE ?1
73 AND NOT EXISTS (
74 SELECT 1 FROM shadow_state s WHERE s.event_id = events.id
75 )
76 ORDER BY timestamp DESC",
77 )?;
78
79 let rows = stmt.query_map([like], |row| {
80 Ok(Event {
81 id: row.get(0)?,
82 timestamp: row.get(1)?,
83 source: row.get(2)?,
84 content: row.get(3)?,
85 meta: row.get(4)?,
86 ingested_at: row.get(5)?,
87 content_hash: row.get(6)?,
88 })
89 })?;
90
91 rows.collect()
92}
93
94pub fn by_source(conn: &Connection, source: &str, limit: Option<i64>) -> Result<Vec<Event>> {
95 let query = if let Some(lim) = limit {
96 format!(
97 "SELECT id, timestamp, source, content, meta, ingested_at, content_hash
98 FROM events
99 WHERE source = ?1
100 AND NOT EXISTS (
101 SELECT 1 FROM shadow_state s WHERE s.event_id = events.id
102 )
103 ORDER BY timestamp DESC
104 LIMIT {}",
105 lim
106 )
107 } else {
108 "SELECT id, timestamp, source, content, meta, ingested_at, content_hash
109 FROM events
110 WHERE source = ?1
111 AND NOT EXISTS (
112 SELECT 1 FROM shadow_state s WHERE s.event_id = events.id
113 )
114 ORDER BY timestamp DESC"
115 .to_string()
116 };
117
118 let mut stmt = conn.prepare(&query)?;
119
120 let rows = stmt.query_map([source], |row| {
121 Ok(Event {
122 id: row.get(0)?,
123 timestamp: row.get(1)?,
124 source: row.get(2)?,
125 content: row.get(3)?,
126 meta: row.get(4)?,
127 ingested_at: row.get(5)?,
128 content_hash: row.get(6)?,
129 })
130 })?;
131
132 rows.collect()
133}
134
135pub fn get_by_id(conn: &Connection, id: &str) -> Result<Event> {
136 conn.query_row(
137 "SELECT id, timestamp, source, content, meta, ingested_at, content_hash
138 FROM events
139 WHERE id = ?1",
140 [id],
141 |row| {
142 Ok(Event {
143 id: row.get(0)?,
144 timestamp: row.get(1)?,
145 source: row.get(2)?,
146 content: row.get(3)?,
147 meta: row.get(4)?,
148 ingested_at: row.get(5)?,
149 content_hash: row.get(6)?,
150 })
151 },
152 )
153}
154
155pub fn by_ingestion_time(
157 conn: &Connection,
158 start: i64,
159 end: i64,
160 limit: Option<i64>,
161) -> Result<Vec<Event>> {
162 let query = if let Some(lim) = limit {
163 format!(
164 "SELECT id, timestamp, source, content, meta, ingested_at, content_hash
165 FROM events
166 WHERE ingested_at BETWEEN ?1 AND ?2
167 AND NOT EXISTS (
168 SELECT 1 FROM shadow_state s WHERE s.event_id = events.id
169 )
170 ORDER BY ingested_at DESC
171 LIMIT {}",
172 lim
173 )
174 } else {
175 "SELECT id, timestamp, source, content, meta, ingested_at, content_hash
176 FROM events
177 WHERE ingested_at BETWEEN ?1 AND ?2
178 AND NOT EXISTS (
179 SELECT 1 FROM shadow_state s WHERE s.event_id = events.id
180 )
181 ORDER BY ingested_at DESC"
182 .to_string()
183 };
184
185 let mut stmt = conn.prepare(&query)?;
186
187 let rows = stmt.query_map([start, end], |row| {
188 Ok(Event {
189 id: row.get(0)?,
190 timestamp: row.get(1)?,
191 source: row.get(2)?,
192 content: row.get(3)?,
193 meta: row.get(4)?,
194 ingested_at: row.get(5)?,
195 content_hash: row.get(6)?,
196 })
197 })?;
198
199 rows.collect()
200}
201
202pub fn dedup_stats(conn: &Connection) -> Result<(i64, i64)> {
204 let total: i64 = conn.query_row(
205 "SELECT COUNT(*)
206 FROM events
207 WHERE NOT EXISTS (
208 SELECT 1 FROM shadow_state s WHERE s.event_id = events.id
209 )",
210 [],
211 |row| row.get(0),
212 )?;
213 let unique: i64 = conn
214 .query_row(
215 "SELECT COUNT(DISTINCT content_hash)
216 FROM events
217 WHERE NOT EXISTS (
218 SELECT 1 FROM shadow_state s WHERE s.event_id = events.id
219 )",
220 [],
221 |row| row.get(0),
222 )
223 .unwrap_or(0);
224
225 Ok((total, unique))
226}
227
228pub fn find_duplicates(conn: &Connection, content_hash: &str) -> Result<Vec<Event>> {
230 let mut stmt = conn.prepare(
231 "SELECT id, timestamp, source, content, meta, ingested_at, content_hash
232 FROM events
233 WHERE content_hash = ?1
234 AND NOT EXISTS (
235 SELECT 1 FROM shadow_state s WHERE s.event_id = events.id
236 )
237 ORDER BY ingested_at ASC",
238 )?;
239
240 let rows = stmt.query_map([content_hash], |row| {
241 Ok(Event {
242 id: row.get(0)?,
243 timestamp: row.get(1)?,
244 source: row.get(2)?,
245 content: row.get(3)?,
246 meta: row.get(4)?,
247 ingested_at: row.get(5)?,
248 content_hash: row.get(6)?,
249 })
250 })?;
251
252 rows.collect()
253}