1use rusqlite::{Connection, params};
2use serde::Serialize;
3
4use crate::errors::AppError;
5
6#[derive(Debug, Clone, Copy, PartialEq, Eq)]
7pub enum QueryState {
8 All,
9 Pending,
10 Enriched,
11}
12
13#[derive(Debug, Clone, Serialize)]
14pub struct AddedItem {
15 pub item_id: i64,
16 pub created_at: String,
17 pub source: String,
18 pub text: String,
19}
20
21#[derive(Debug, Clone, Serialize)]
22pub struct ListItem {
23 pub item_id: i64,
24 pub created_at: String,
25 pub state: String,
26 pub text_preview: String,
27 pub content_type: Option<String>,
28 pub validation_status: Option<String>,
29}
30
31#[derive(Debug, Clone, Serialize)]
32pub struct FetchItem {
33 pub item_id: i64,
34 pub created_at: String,
35 pub source: String,
36 pub text: String,
37 pub state: String,
38 pub content_type: Option<String>,
39 pub validation_status: Option<String>,
40}
41
42#[derive(Debug, Clone)]
43pub struct FetchCursor {
44 pub item_id: i64,
45 pub created_at: String,
46}
47
48pub fn add_item(
49 conn: &Connection,
50 text: &str,
51 source: &str,
52 created_at: Option<&str>,
53) -> Result<AddedItem, AppError> {
54 if let Some(created_at) = created_at {
55 conn.execute(
56 "insert into inbox_items(source, raw_text, created_at) values(?1, ?2, ?3)",
57 params![source, text, created_at],
58 )
59 .map_err(AppError::db_write)?;
60 } else {
61 conn.execute(
62 "insert into inbox_items(source, raw_text) values(?1, ?2)",
63 params![source, text],
64 )
65 .map_err(AppError::db_write)?;
66 }
67
68 let item_id = conn.last_insert_rowid();
69 let created_at: String = conn
70 .query_row(
71 "select created_at from inbox_items where item_id = ?1",
72 params![item_id],
73 |row| row.get(0),
74 )
75 .map_err(AppError::db_query)?;
76
77 Ok(AddedItem {
78 item_id,
79 created_at,
80 source: source.to_string(),
81 text: text.to_string(),
82 })
83}
84
85pub fn list_items(
86 conn: &Connection,
87 state: QueryState,
88 limit: usize,
89 offset: usize,
90) -> Result<Vec<ListItem>, AppError> {
91 let state_filter = state_sql(state);
92 let sql = format!(
93 "select
94 i.item_id,
95 i.created_at,
96 case
97 when ad.derivation_id is not null then 'enriched'
98 else 'pending'
99 end as state,
100 substr(i.raw_text, 1, 80) as text_preview,
101 json_extract(ad.payload_json, '$.content_type') as content_type,
102 json_extract(ad.payload_json, '$.validation_status') as validation_status
103 from inbox_items i
104 left join item_derivations ad
105 on ad.derivation_id = (
106 select d.derivation_id
107 from item_derivations d
108 where d.item_id = i.item_id
109 and d.is_active = 1
110 and d.status = 'accepted'
111 order by d.derivation_version desc, d.derivation_id desc
112 limit 1
113 )
114 where {state_filter}
115 order by i.created_at desc, i.item_id desc
116 limit ?1 offset ?2"
117 );
118
119 let mut stmt = conn.prepare(&sql).map_err(AppError::db_query)?;
120 let rows = stmt
121 .query_map(params![limit as i64, offset as i64], |row| {
122 Ok(ListItem {
123 item_id: row.get(0)?,
124 created_at: row.get(1)?,
125 state: row.get(2)?,
126 text_preview: row.get(3)?,
127 content_type: row.get(4)?,
128 validation_status: row.get(5)?,
129 })
130 })
131 .map_err(AppError::db_query)?;
132
133 rows.collect::<Result<Vec<_>, _>>()
134 .map_err(AppError::db_query)
135}
136
137pub fn lookup_fetch_cursor(
138 conn: &Connection,
139 item_id: i64,
140) -> Result<Option<FetchCursor>, AppError> {
141 conn.query_row(
142 "select item_id, created_at from inbox_items where item_id = ?1",
143 params![item_id],
144 |row| {
145 Ok(FetchCursor {
146 item_id: row.get(0)?,
147 created_at: row.get(1)?,
148 })
149 },
150 )
151 .map(Some)
152 .or_else(|err| match err {
153 rusqlite::Error::QueryReturnedNoRows => Ok(None),
154 other => Err(AppError::db_query(other)),
155 })
156}
157
158pub fn fetch_pending_page(
159 conn: &Connection,
160 limit: usize,
161 cursor: Option<&FetchCursor>,
162) -> Result<Vec<FetchItem>, AppError> {
163 let mut stmt = conn
164 .prepare(
165 "select i.item_id, i.created_at, i.source, i.raw_text
166 , null as content_type
167 , null as validation_status
168 from inbox_items i
169 where not exists (
170 select 1 from item_derivations d
171 where d.item_id = i.item_id and d.is_active = 1 and d.status = 'accepted'
172 )
173 and (
174 ?1 is null
175 or i.created_at < ?2
176 or (i.created_at = ?2 and i.item_id < ?1)
177 )
178 order by i.created_at desc, i.item_id desc
179 limit ?3",
180 )
181 .map_err(AppError::db_query)?;
182
183 let cursor_item_id = cursor.map(|value| value.item_id);
184 let cursor_created_at = cursor.map(|value| value.created_at.as_str());
185
186 let rows = stmt
187 .query_map(
188 params![cursor_item_id, cursor_created_at, limit as i64],
189 |row| {
190 Ok(FetchItem {
191 item_id: row.get(0)?,
192 created_at: row.get(1)?,
193 source: row.get(2)?,
194 text: row.get(3)?,
195 state: "pending".to_string(),
196 content_type: row.get(4)?,
197 validation_status: row.get(5)?,
198 })
199 },
200 )
201 .map_err(AppError::db_query)?;
202
203 rows.collect::<Result<Vec<_>, _>>()
204 .map_err(AppError::db_query)
205}
206
207fn state_sql(state: QueryState) -> &'static str {
208 match state {
209 QueryState::All => "1 = 1",
210 QueryState::Pending => {
211 "not exists (
212 select 1 from item_derivations d
213 where d.item_id = i.item_id and d.is_active = 1 and d.status = 'accepted'
214 )"
215 }
216 QueryState::Enriched => {
217 "exists (
218 select 1 from item_derivations d
219 where d.item_id = i.item_id and d.is_active = 1 and d.status = 'accepted'
220 )"
221 }
222 }
223}