1use rusqlite::{Connection, params};
2use serde::Serialize;
3
4use crate::errors::AppError;
5
6const INBOX_ITEM_ALLOCATOR_NAME: &str = "inbox_items";
7
8#[derive(Debug, Clone, Copy, PartialEq, Eq)]
9pub enum QueryState {
10 All,
11 Pending,
12 Enriched,
13}
14
15#[derive(Debug, Clone, Serialize)]
16pub struct AddedItem {
17 pub item_id: i64,
18 pub created_at: String,
19 pub source: String,
20 pub text: String,
21}
22
23#[derive(Debug, Clone, Serialize)]
24pub struct UpdatedItem {
25 pub item_id: i64,
26 pub updated_at: String,
27 pub text: String,
28 pub cleared_derivations: i64,
29 pub cleared_workflow_anchors: i64,
30}
31
32#[derive(Debug, Clone, Serialize)]
33pub struct DeletedItem {
34 pub item_id: i64,
35 pub deleted_at: String,
36 pub removed_derivations: i64,
37 pub removed_workflow_anchors: i64,
38}
39
40#[derive(Debug, Clone, Serialize)]
41pub struct ListItem {
42 pub item_id: i64,
43 pub created_at: String,
44 pub state: String,
45 pub text_preview: String,
46 pub content_type: Option<String>,
47 pub validation_status: Option<String>,
48}
49
50#[derive(Debug, Clone, Serialize)]
51pub struct FetchItem {
52 pub item_id: i64,
53 pub created_at: String,
54 pub source: String,
55 pub text: String,
56 pub state: String,
57 pub content_type: Option<String>,
58 pub validation_status: Option<String>,
59}
60
61#[derive(Debug, Clone)]
62pub struct FetchCursor {
63 pub item_id: i64,
64 pub created_at: String,
65}
66
67fn ensure_item_allocator_seeded(conn: &Connection) -> Result<(), AppError> {
68 conn.execute(
69 "insert into id_allocators(name, last_id)
70 values (?1, coalesce((select max(item_id) from inbox_items), 0))
71 on conflict(name) do update
72 set last_id = max(id_allocators.last_id, excluded.last_id)",
73 params![INBOX_ITEM_ALLOCATOR_NAME],
74 )
75 .map_err(AppError::db_write)?;
76 Ok(())
77}
78
79fn allocate_next_item_id(conn: &Connection) -> Result<i64, AppError> {
80 ensure_item_allocator_seeded(conn)?;
81 conn.execute(
82 "update id_allocators
83 set last_id = last_id + 1
84 where name = ?1",
85 params![INBOX_ITEM_ALLOCATOR_NAME],
86 )
87 .map_err(AppError::db_write)?;
88
89 conn.query_row(
90 "select last_id from id_allocators where name = ?1",
91 params![INBOX_ITEM_ALLOCATOR_NAME],
92 |row| row.get(0),
93 )
94 .map_err(AppError::db_query)
95}
96
97pub fn add_item(
98 conn: &Connection,
99 text: &str,
100 source: &str,
101 created_at: Option<&str>,
102) -> Result<AddedItem, AppError> {
103 let item_id = allocate_next_item_id(conn)?;
104
105 if let Some(created_at) = created_at {
106 conn.execute(
107 "insert into inbox_items(item_id, source, raw_text, created_at)
108 values(?1, ?2, ?3, ?4)",
109 params![item_id, source, text, created_at],
110 )
111 .map_err(AppError::db_write)?;
112 } else {
113 conn.execute(
114 "insert into inbox_items(item_id, source, raw_text) values(?1, ?2, ?3)",
115 params![item_id, source, text],
116 )
117 .map_err(AppError::db_write)?;
118 }
119
120 let created_at: String = conn
121 .query_row(
122 "select created_at from inbox_items where item_id = ?1",
123 params![item_id],
124 |row| row.get(0),
125 )
126 .map_err(AppError::db_query)?;
127
128 Ok(AddedItem {
129 item_id,
130 created_at,
131 source: source.to_string(),
132 text: text.to_string(),
133 })
134}
135
136pub fn update_item(conn: &Connection, item_id: i64, text: &str) -> Result<UpdatedItem, AppError> {
137 let text = text.trim();
138 if text.is_empty() {
139 return Err(AppError::usage("update requires a non-empty text argument"));
140 }
141
142 let exists: i64 = conn
143 .query_row(
144 "select count(*) from inbox_items where item_id = ?1",
145 params![item_id],
146 |row| row.get(0),
147 )
148 .map_err(AppError::db_query)?;
149 if exists == 0 {
150 return Err(AppError::usage("item_id does not exist")
151 .with_code("invalid-item-id")
152 .with_details(serde_json::json!({ "item_id": item_id })));
153 }
154
155 let removed_workflow_anchors = conn
156 .execute(
157 "delete from workflow_item_anchors where item_id = ?1",
158 params![item_id],
159 )
160 .map_err(AppError::db_write)? as i64;
161 let cleared_derivations = conn
162 .execute(
163 "delete from item_derivations where item_id = ?1",
164 params![item_id],
165 )
166 .map_err(AppError::db_write)? as i64;
167 conn.execute(
168 "update inbox_items set raw_text = ?1 where item_id = ?2",
169 params![text, item_id],
170 )
171 .map_err(AppError::db_write)?;
172 conn.execute(
173 "delete from tags
174 where not exists (
175 select 1
176 from item_tags it
177 where it.tag_id = tags.tag_id
178 )",
179 [],
180 )
181 .map_err(AppError::db_write)?;
182
183 let updated_at: String = conn
184 .query_row(
185 "select updated_at from item_search_documents where item_id = ?1",
186 params![item_id],
187 |row| row.get(0),
188 )
189 .map_err(AppError::db_query)?;
190
191 Ok(UpdatedItem {
192 item_id,
193 updated_at,
194 text: text.to_string(),
195 cleared_derivations,
196 cleared_workflow_anchors: removed_workflow_anchors,
197 })
198}
199
200pub fn delete_item_hard(conn: &Connection, item_id: i64) -> Result<DeletedItem, AppError> {
201 let exists: i64 = conn
202 .query_row(
203 "select count(*) from inbox_items where item_id = ?1",
204 params![item_id],
205 |row| row.get(0),
206 )
207 .map_err(AppError::db_query)?;
208 if exists == 0 {
209 return Err(AppError::usage("item_id does not exist")
210 .with_code("invalid-item-id")
211 .with_details(serde_json::json!({ "item_id": item_id })));
212 }
213
214 let removed_workflow_anchors = conn
215 .execute(
216 "delete from workflow_item_anchors where item_id = ?1",
217 params![item_id],
218 )
219 .map_err(AppError::db_write)? as i64;
220 let removed_derivations = conn
221 .execute(
222 "delete from item_derivations where item_id = ?1",
223 params![item_id],
224 )
225 .map_err(AppError::db_write)? as i64;
226 conn.execute(
227 "delete from item_search_documents where item_id = ?1",
228 params![item_id],
229 )
230 .map_err(AppError::db_write)?;
231 conn.execute(
232 "delete from inbox_items where item_id = ?1",
233 params![item_id],
234 )
235 .map_err(AppError::db_write)?;
236 conn.execute(
237 "delete from tags
238 where not exists (
239 select 1
240 from item_tags it
241 where it.tag_id = tags.tag_id
242 )",
243 [],
244 )
245 .map_err(AppError::db_write)?;
246
247 let deleted_at: String = conn
248 .query_row("select strftime('%Y-%m-%dT%H:%M:%fZ', 'now')", [], |row| {
249 row.get(0)
250 })
251 .map_err(AppError::db_query)?;
252
253 Ok(DeletedItem {
254 item_id,
255 deleted_at,
256 removed_derivations,
257 removed_workflow_anchors,
258 })
259}
260
261pub fn list_items(
262 conn: &Connection,
263 state: QueryState,
264 limit: usize,
265 offset: usize,
266) -> Result<Vec<ListItem>, AppError> {
267 let state_filter = state_sql(state);
268 let sql = format!(
269 "select
270 i.item_id,
271 i.created_at,
272 case
273 when ad.derivation_id is not null then 'enriched'
274 else 'pending'
275 end as state,
276 substr(i.raw_text, 1, 80) as text_preview,
277 json_extract(ad.payload_json, '$.content_type') as content_type,
278 json_extract(ad.payload_json, '$.validation_status') as validation_status
279 from inbox_items i
280 left join item_derivations ad
281 on ad.derivation_id = (
282 select d.derivation_id
283 from item_derivations d
284 where d.item_id = i.item_id
285 and d.is_active = 1
286 and d.status = 'accepted'
287 order by d.derivation_version desc, d.derivation_id desc
288 limit 1
289 )
290 where {state_filter}
291 order by i.created_at desc, i.item_id desc
292 limit ?1 offset ?2"
293 );
294
295 let mut stmt = conn.prepare(&sql).map_err(AppError::db_query)?;
296 let rows = stmt
297 .query_map(params![limit as i64, offset as i64], |row| {
298 Ok(ListItem {
299 item_id: row.get(0)?,
300 created_at: row.get(1)?,
301 state: row.get(2)?,
302 text_preview: row.get(3)?,
303 content_type: row.get(4)?,
304 validation_status: row.get(5)?,
305 })
306 })
307 .map_err(AppError::db_query)?;
308
309 rows.collect::<Result<Vec<_>, _>>()
310 .map_err(AppError::db_query)
311}
312
313pub fn lookup_fetch_cursor(
314 conn: &Connection,
315 item_id: i64,
316) -> Result<Option<FetchCursor>, AppError> {
317 conn.query_row(
318 "select item_id, created_at from inbox_items where item_id = ?1",
319 params![item_id],
320 |row| {
321 Ok(FetchCursor {
322 item_id: row.get(0)?,
323 created_at: row.get(1)?,
324 })
325 },
326 )
327 .map(Some)
328 .or_else(|err| match err {
329 rusqlite::Error::QueryReturnedNoRows => Ok(None),
330 other => Err(AppError::db_query(other)),
331 })
332}
333
334pub fn fetch_pending_page(
335 conn: &Connection,
336 limit: usize,
337 cursor: Option<&FetchCursor>,
338) -> Result<Vec<FetchItem>, AppError> {
339 let mut stmt = conn
340 .prepare(
341 "select i.item_id, i.created_at, i.source, i.raw_text
342 , null as content_type
343 , null as validation_status
344 from inbox_items i
345 where not exists (
346 select 1 from item_derivations d
347 where d.item_id = i.item_id and d.is_active = 1 and d.status = 'accepted'
348 )
349 and (
350 ?1 is null
351 or i.created_at < ?2
352 or (i.created_at = ?2 and i.item_id < ?1)
353 )
354 order by i.created_at desc, i.item_id desc
355 limit ?3",
356 )
357 .map_err(AppError::db_query)?;
358
359 let cursor_item_id = cursor.map(|value| value.item_id);
360 let cursor_created_at = cursor.map(|value| value.created_at.as_str());
361
362 let rows = stmt
363 .query_map(
364 params![cursor_item_id, cursor_created_at, limit as i64],
365 |row| {
366 Ok(FetchItem {
367 item_id: row.get(0)?,
368 created_at: row.get(1)?,
369 source: row.get(2)?,
370 text: row.get(3)?,
371 state: "pending".to_string(),
372 content_type: row.get(4)?,
373 validation_status: row.get(5)?,
374 })
375 },
376 )
377 .map_err(AppError::db_query)?;
378
379 rows.collect::<Result<Vec<_>, _>>()
380 .map_err(AppError::db_query)
381}
382
383fn state_sql(state: QueryState) -> &'static str {
384 match state {
385 QueryState::All => "1 = 1",
386 QueryState::Pending => {
387 "not exists (
388 select 1 from item_derivations d
389 where d.item_id = i.item_id and d.is_active = 1 and d.status = 'accepted'
390 )"
391 }
392 QueryState::Enriched => {
393 "exists (
394 select 1 from item_derivations d
395 where d.item_id = i.item_id and d.is_active = 1 and d.status = 'accepted'
396 )"
397 }
398 }
399}