1use rusqlite::{Connection, params};
2use serde::Serialize;
3
4use crate::errors::AppError;
5
6#[derive(Debug, Clone, Copy, PartialEq, Eq)]
7pub enum QueryState {
8 All,
9 Pending,
10 Enriched,
11}
12
13#[derive(Debug, Clone, Serialize)]
14pub struct AddedItem {
15 pub item_id: i64,
16 pub created_at: String,
17 pub source: String,
18 pub text: String,
19}
20
21#[derive(Debug, Clone, Serialize)]
22pub struct UpdatedItem {
23 pub item_id: i64,
24 pub updated_at: String,
25 pub text: String,
26 pub cleared_derivations: i64,
27 pub cleared_workflow_anchors: i64,
28}
29
30#[derive(Debug, Clone, Serialize)]
31pub struct DeletedItem {
32 pub item_id: i64,
33 pub deleted_at: String,
34 pub removed_derivations: i64,
35 pub removed_workflow_anchors: i64,
36}
37
38#[derive(Debug, Clone, Serialize)]
39pub struct ListItem {
40 pub item_id: i64,
41 pub created_at: String,
42 pub state: String,
43 pub text_preview: String,
44 pub content_type: Option<String>,
45 pub validation_status: Option<String>,
46}
47
48#[derive(Debug, Clone, Serialize)]
49pub struct FetchItem {
50 pub item_id: i64,
51 pub created_at: String,
52 pub source: String,
53 pub text: String,
54 pub state: String,
55 pub content_type: Option<String>,
56 pub validation_status: Option<String>,
57}
58
59#[derive(Debug, Clone)]
60pub struct FetchCursor {
61 pub item_id: i64,
62 pub created_at: String,
63}
64
65pub fn add_item(
66 conn: &Connection,
67 text: &str,
68 source: &str,
69 created_at: Option<&str>,
70) -> Result<AddedItem, AppError> {
71 if let Some(created_at) = created_at {
72 conn.execute(
73 "insert into inbox_items(source, raw_text, created_at) values(?1, ?2, ?3)",
74 params![source, text, created_at],
75 )
76 .map_err(AppError::db_write)?;
77 } else {
78 conn.execute(
79 "insert into inbox_items(source, raw_text) values(?1, ?2)",
80 params![source, text],
81 )
82 .map_err(AppError::db_write)?;
83 }
84
85 let item_id = conn.last_insert_rowid();
86 let created_at: String = conn
87 .query_row(
88 "select created_at from inbox_items where item_id = ?1",
89 params![item_id],
90 |row| row.get(0),
91 )
92 .map_err(AppError::db_query)?;
93
94 Ok(AddedItem {
95 item_id,
96 created_at,
97 source: source.to_string(),
98 text: text.to_string(),
99 })
100}
101
102pub fn update_item(conn: &Connection, item_id: i64, text: &str) -> Result<UpdatedItem, AppError> {
103 let text = text.trim();
104 if text.is_empty() {
105 return Err(AppError::usage("update requires a non-empty text argument"));
106 }
107
108 let exists: i64 = conn
109 .query_row(
110 "select count(*) from inbox_items where item_id = ?1",
111 params![item_id],
112 |row| row.get(0),
113 )
114 .map_err(AppError::db_query)?;
115 if exists == 0 {
116 return Err(AppError::usage("item_id does not exist")
117 .with_code("invalid-item-id")
118 .with_details(serde_json::json!({ "item_id": item_id })));
119 }
120
121 let removed_workflow_anchors = conn
122 .execute(
123 "delete from workflow_item_anchors where item_id = ?1",
124 params![item_id],
125 )
126 .map_err(AppError::db_write)? as i64;
127 let cleared_derivations = conn
128 .execute(
129 "delete from item_derivations where item_id = ?1",
130 params![item_id],
131 )
132 .map_err(AppError::db_write)? as i64;
133 conn.execute(
134 "update inbox_items set raw_text = ?1 where item_id = ?2",
135 params![text, item_id],
136 )
137 .map_err(AppError::db_write)?;
138 conn.execute(
139 "delete from tags
140 where not exists (
141 select 1
142 from item_tags it
143 where it.tag_id = tags.tag_id
144 )",
145 [],
146 )
147 .map_err(AppError::db_write)?;
148
149 let updated_at: String = conn
150 .query_row(
151 "select updated_at from item_search_documents where item_id = ?1",
152 params![item_id],
153 |row| row.get(0),
154 )
155 .map_err(AppError::db_query)?;
156
157 Ok(UpdatedItem {
158 item_id,
159 updated_at,
160 text: text.to_string(),
161 cleared_derivations,
162 cleared_workflow_anchors: removed_workflow_anchors,
163 })
164}
165
166pub fn delete_item_hard(conn: &Connection, item_id: i64) -> Result<DeletedItem, AppError> {
167 let exists: i64 = conn
168 .query_row(
169 "select count(*) from inbox_items where item_id = ?1",
170 params![item_id],
171 |row| row.get(0),
172 )
173 .map_err(AppError::db_query)?;
174 if exists == 0 {
175 return Err(AppError::usage("item_id does not exist")
176 .with_code("invalid-item-id")
177 .with_details(serde_json::json!({ "item_id": item_id })));
178 }
179
180 let removed_workflow_anchors = conn
181 .execute(
182 "delete from workflow_item_anchors where item_id = ?1",
183 params![item_id],
184 )
185 .map_err(AppError::db_write)? as i64;
186 let removed_derivations = conn
187 .execute(
188 "delete from item_derivations where item_id = ?1",
189 params![item_id],
190 )
191 .map_err(AppError::db_write)? as i64;
192 conn.execute(
193 "delete from item_search_documents where item_id = ?1",
194 params![item_id],
195 )
196 .map_err(AppError::db_write)?;
197 conn.execute(
198 "delete from inbox_items where item_id = ?1",
199 params![item_id],
200 )
201 .map_err(AppError::db_write)?;
202 conn.execute(
203 "delete from tags
204 where not exists (
205 select 1
206 from item_tags it
207 where it.tag_id = tags.tag_id
208 )",
209 [],
210 )
211 .map_err(AppError::db_write)?;
212
213 let deleted_at: String = conn
214 .query_row("select strftime('%Y-%m-%dT%H:%M:%fZ', 'now')", [], |row| {
215 row.get(0)
216 })
217 .map_err(AppError::db_query)?;
218
219 Ok(DeletedItem {
220 item_id,
221 deleted_at,
222 removed_derivations,
223 removed_workflow_anchors,
224 })
225}
226
227pub fn list_items(
228 conn: &Connection,
229 state: QueryState,
230 limit: usize,
231 offset: usize,
232) -> Result<Vec<ListItem>, AppError> {
233 let state_filter = state_sql(state);
234 let sql = format!(
235 "select
236 i.item_id,
237 i.created_at,
238 case
239 when ad.derivation_id is not null then 'enriched'
240 else 'pending'
241 end as state,
242 substr(i.raw_text, 1, 80) as text_preview,
243 json_extract(ad.payload_json, '$.content_type') as content_type,
244 json_extract(ad.payload_json, '$.validation_status') as validation_status
245 from inbox_items i
246 left join item_derivations ad
247 on ad.derivation_id = (
248 select d.derivation_id
249 from item_derivations d
250 where d.item_id = i.item_id
251 and d.is_active = 1
252 and d.status = 'accepted'
253 order by d.derivation_version desc, d.derivation_id desc
254 limit 1
255 )
256 where {state_filter}
257 order by i.created_at desc, i.item_id desc
258 limit ?1 offset ?2"
259 );
260
261 let mut stmt = conn.prepare(&sql).map_err(AppError::db_query)?;
262 let rows = stmt
263 .query_map(params![limit as i64, offset as i64], |row| {
264 Ok(ListItem {
265 item_id: row.get(0)?,
266 created_at: row.get(1)?,
267 state: row.get(2)?,
268 text_preview: row.get(3)?,
269 content_type: row.get(4)?,
270 validation_status: row.get(5)?,
271 })
272 })
273 .map_err(AppError::db_query)?;
274
275 rows.collect::<Result<Vec<_>, _>>()
276 .map_err(AppError::db_query)
277}
278
279pub fn lookup_fetch_cursor(
280 conn: &Connection,
281 item_id: i64,
282) -> Result<Option<FetchCursor>, AppError> {
283 conn.query_row(
284 "select item_id, created_at from inbox_items where item_id = ?1",
285 params![item_id],
286 |row| {
287 Ok(FetchCursor {
288 item_id: row.get(0)?,
289 created_at: row.get(1)?,
290 })
291 },
292 )
293 .map(Some)
294 .or_else(|err| match err {
295 rusqlite::Error::QueryReturnedNoRows => Ok(None),
296 other => Err(AppError::db_query(other)),
297 })
298}
299
300pub fn fetch_pending_page(
301 conn: &Connection,
302 limit: usize,
303 cursor: Option<&FetchCursor>,
304) -> Result<Vec<FetchItem>, AppError> {
305 let mut stmt = conn
306 .prepare(
307 "select i.item_id, i.created_at, i.source, i.raw_text
308 , null as content_type
309 , null as validation_status
310 from inbox_items i
311 where not exists (
312 select 1 from item_derivations d
313 where d.item_id = i.item_id and d.is_active = 1 and d.status = 'accepted'
314 )
315 and (
316 ?1 is null
317 or i.created_at < ?2
318 or (i.created_at = ?2 and i.item_id < ?1)
319 )
320 order by i.created_at desc, i.item_id desc
321 limit ?3",
322 )
323 .map_err(AppError::db_query)?;
324
325 let cursor_item_id = cursor.map(|value| value.item_id);
326 let cursor_created_at = cursor.map(|value| value.created_at.as_str());
327
328 let rows = stmt
329 .query_map(
330 params![cursor_item_id, cursor_created_at, limit as i64],
331 |row| {
332 Ok(FetchItem {
333 item_id: row.get(0)?,
334 created_at: row.get(1)?,
335 source: row.get(2)?,
336 text: row.get(3)?,
337 state: "pending".to_string(),
338 content_type: row.get(4)?,
339 validation_status: row.get(5)?,
340 })
341 },
342 )
343 .map_err(AppError::db_query)?;
344
345 rows.collect::<Result<Vec<_>, _>>()
346 .map_err(AppError::db_query)
347}
348
349fn state_sql(state: QueryState) -> &'static str {
350 match state {
351 QueryState::All => "1 = 1",
352 QueryState::Pending => {
353 "not exists (
354 select 1 from item_derivations d
355 where d.item_id = i.item_id and d.is_active = 1 and d.status = 'accepted'
356 )"
357 }
358 QueryState::Enriched => {
359 "exists (
360 select 1 from item_derivations d
361 where d.item_id = i.item_id and d.is_active = 1 and d.status = 'accepted'
362 )"
363 }
364 }
365}