1use chrono::{DateTime, TimeZone, Utc};
2use rusqlite::{Connection, Result};
3
4#[derive(Debug)]
5pub struct Event {
6 pub id: String,
7 pub timestamp: i64,
8 pub source: String,
9 pub content: String,
10 pub meta: Option<String>,
11 pub ingested_at: i64,
12 pub content_hash: Option<String>,
13}
14
15impl Event {
16 pub fn format_time(&self) -> String {
17 let dt: DateTime<Utc> = Utc.timestamp_opt(self.timestamp, 0).unwrap();
18 dt.format("%Y-%m-%d %H:%M:%S UTC").to_string()
19 }
20
21 pub fn format_ingested_at(&self) -> String {
22 let dt: DateTime<Utc> = Utc.timestamp_opt(self.ingested_at, 0).unwrap();
23 dt.format("%Y-%m-%d %H:%M:%S UTC").to_string()
24 }
25
26 pub fn preview_content(&self, max_chars: usize) -> String {
27 let total_chars = self.content.chars().count();
28 if total_chars <= max_chars {
29 self.content.clone()
30 } else {
31 let preview: String = self.content.chars().take(max_chars).collect();
32 format!(
33 "{}...\n\n[Content truncated: {} of {} chars shown]",
34 preview, max_chars, total_chars
35 )
36 }
37 }
38}
39
40pub fn recent(conn: &Connection, limit: i64) -> Result<Vec<Event>> {
41 let mut stmt = conn.prepare(
42 "SELECT id, timestamp, source, content, meta, ingested_at, content_hash
43 FROM events
44 ORDER BY timestamp DESC
45 LIMIT ?1",
46 )?;
47
48 let rows = stmt.query_map([limit], |row| {
49 Ok(Event {
50 id: row.get(0)?,
51 timestamp: row.get(1)?,
52 source: row.get(2)?,
53 content: row.get(3)?,
54 meta: row.get(4)?,
55 ingested_at: row.get(5)?,
56 content_hash: row.get(6)?,
57 })
58 })?;
59
60 rows.collect()
61}
62
63pub fn search(conn: &Connection, term: &str) -> Result<Vec<Event>> {
64 let like = format!("%{}%", term);
65
66 let mut stmt = conn.prepare(
67 "SELECT id, timestamp, source, content, meta, ingested_at, content_hash
68 FROM events
69 WHERE content LIKE ?1
70 ORDER BY timestamp DESC",
71 )?;
72
73 let rows = stmt.query_map([like], |row| {
74 Ok(Event {
75 id: row.get(0)?,
76 timestamp: row.get(1)?,
77 source: row.get(2)?,
78 content: row.get(3)?,
79 meta: row.get(4)?,
80 ingested_at: row.get(5)?,
81 content_hash: row.get(6)?,
82 })
83 })?;
84
85 rows.collect()
86}
87
88pub fn by_source(conn: &Connection, source: &str, limit: Option<i64>) -> Result<Vec<Event>> {
89 let query = if let Some(lim) = limit {
90 format!(
91 "SELECT id, timestamp, source, content, meta, ingested_at, content_hash
92 FROM events
93 WHERE source = ?1
94 ORDER BY timestamp DESC
95 LIMIT {}",
96 lim
97 )
98 } else {
99 "SELECT id, timestamp, source, content, meta, ingested_at, content_hash
100 FROM events
101 WHERE source = ?1
102 ORDER BY timestamp DESC"
103 .to_string()
104 };
105
106 let mut stmt = conn.prepare(&query)?;
107
108 let rows = stmt.query_map([source], |row| {
109 Ok(Event {
110 id: row.get(0)?,
111 timestamp: row.get(1)?,
112 source: row.get(2)?,
113 content: row.get(3)?,
114 meta: row.get(4)?,
115 ingested_at: row.get(5)?,
116 content_hash: row.get(6)?,
117 })
118 })?;
119
120 rows.collect()
121}
122
123pub fn get_by_id(conn: &Connection, id: &str) -> Result<Event> {
124 conn.query_row(
125 "SELECT id, timestamp, source, content, meta, ingested_at, content_hash
126 FROM events
127 WHERE id = ?1",
128 [id],
129 |row| {
130 Ok(Event {
131 id: row.get(0)?,
132 timestamp: row.get(1)?,
133 source: row.get(2)?,
134 content: row.get(3)?,
135 meta: row.get(4)?,
136 ingested_at: row.get(5)?,
137 content_hash: row.get(6)?,
138 })
139 },
140 )
141}
142
143pub fn by_ingestion_time(
145 conn: &Connection,
146 start: i64,
147 end: i64,
148 limit: Option<i64>,
149) -> Result<Vec<Event>> {
150 let query = if let Some(lim) = limit {
151 format!(
152 "SELECT id, timestamp, source, content, meta, ingested_at, content_hash
153 FROM events
154 WHERE ingested_at BETWEEN ?1 AND ?2
155 ORDER BY ingested_at DESC
156 LIMIT {}",
157 lim
158 )
159 } else {
160 "SELECT id, timestamp, source, content, meta, ingested_at, content_hash
161 FROM events
162 WHERE ingested_at BETWEEN ?1 AND ?2
163 ORDER BY ingested_at DESC"
164 .to_string()
165 };
166
167 let mut stmt = conn.prepare(&query)?;
168
169 let rows = stmt.query_map([start, end], |row| {
170 Ok(Event {
171 id: row.get(0)?,
172 timestamp: row.get(1)?,
173 source: row.get(2)?,
174 content: row.get(3)?,
175 meta: row.get(4)?,
176 ingested_at: row.get(5)?,
177 content_hash: row.get(6)?,
178 })
179 })?;
180
181 rows.collect()
182}
183
184pub fn dedup_stats(conn: &Connection) -> Result<(i64, i64)> {
186 let total: i64 = conn.query_row("SELECT COUNT(*) FROM events", [], |row| row.get(0))?;
187 let unique: i64 = conn
188 .query_row(
189 "SELECT COUNT(DISTINCT content_hash) FROM events",
190 [],
191 |row| row.get(0),
192 )
193 .unwrap_or(0);
194
195 Ok((total, unique))
196}
197
198pub fn find_duplicates(conn: &Connection, content_hash: &str) -> Result<Vec<Event>> {
200 let mut stmt = conn.prepare(
201 "SELECT id, timestamp, source, content, meta, ingested_at, content_hash
202 FROM events
203 WHERE content_hash = ?1
204 ORDER BY ingested_at ASC",
205 )?;
206
207 let rows = stmt.query_map([content_hash], |row| {
208 Ok(Event {
209 id: row.get(0)?,
210 timestamp: row.get(1)?,
211 source: row.get(2)?,
212 content: row.get(3)?,
213 meta: row.get(4)?,
214 ingested_at: row.get(5)?,
215 content_hash: row.get(6)?,
216 })
217 })?;
218
219 rows.collect()
220}