Skip to main content

memo_cli/storage/
repository.rs

1use rusqlite::{Connection, params};
2use serde::Serialize;
3
4use crate::errors::AppError;
5
6#[derive(Debug, Clone, Copy, PartialEq, Eq)]
7pub enum QueryState {
8    All,
9    Pending,
10    Enriched,
11}
12
13#[derive(Debug, Clone, Serialize)]
14pub struct AddedItem {
15    pub item_id: i64,
16    pub created_at: String,
17    pub source: String,
18    pub text: String,
19}
20
21#[derive(Debug, Clone, Serialize)]
22pub struct UpdatedItem {
23    pub item_id: i64,
24    pub updated_at: String,
25    pub text: String,
26    pub cleared_derivations: i64,
27    pub cleared_workflow_anchors: i64,
28}
29
30#[derive(Debug, Clone, Serialize)]
31pub struct DeletedItem {
32    pub item_id: i64,
33    pub deleted_at: String,
34    pub removed_derivations: i64,
35    pub removed_workflow_anchors: i64,
36}
37
38#[derive(Debug, Clone, Serialize)]
39pub struct ListItem {
40    pub item_id: i64,
41    pub created_at: String,
42    pub state: String,
43    pub text_preview: String,
44    pub content_type: Option<String>,
45    pub validation_status: Option<String>,
46}
47
48#[derive(Debug, Clone, Serialize)]
49pub struct FetchItem {
50    pub item_id: i64,
51    pub created_at: String,
52    pub source: String,
53    pub text: String,
54    pub state: String,
55    pub content_type: Option<String>,
56    pub validation_status: Option<String>,
57}
58
59#[derive(Debug, Clone)]
60pub struct FetchCursor {
61    pub item_id: i64,
62    pub created_at: String,
63}
64
65pub fn add_item(
66    conn: &Connection,
67    text: &str,
68    source: &str,
69    created_at: Option<&str>,
70) -> Result<AddedItem, AppError> {
71    if let Some(created_at) = created_at {
72        conn.execute(
73            "insert into inbox_items(source, raw_text, created_at) values(?1, ?2, ?3)",
74            params![source, text, created_at],
75        )
76        .map_err(AppError::db_write)?;
77    } else {
78        conn.execute(
79            "insert into inbox_items(source, raw_text) values(?1, ?2)",
80            params![source, text],
81        )
82        .map_err(AppError::db_write)?;
83    }
84
85    let item_id = conn.last_insert_rowid();
86    let created_at: String = conn
87        .query_row(
88            "select created_at from inbox_items where item_id = ?1",
89            params![item_id],
90            |row| row.get(0),
91        )
92        .map_err(AppError::db_query)?;
93
94    Ok(AddedItem {
95        item_id,
96        created_at,
97        source: source.to_string(),
98        text: text.to_string(),
99    })
100}
101
102pub fn update_item(conn: &Connection, item_id: i64, text: &str) -> Result<UpdatedItem, AppError> {
103    let text = text.trim();
104    if text.is_empty() {
105        return Err(AppError::usage("update requires a non-empty text argument"));
106    }
107
108    let exists: i64 = conn
109        .query_row(
110            "select count(*) from inbox_items where item_id = ?1",
111            params![item_id],
112            |row| row.get(0),
113        )
114        .map_err(AppError::db_query)?;
115    if exists == 0 {
116        return Err(AppError::usage("item_id does not exist")
117            .with_code("invalid-item-id")
118            .with_details(serde_json::json!({ "item_id": item_id })));
119    }
120
121    let removed_workflow_anchors = conn
122        .execute(
123            "delete from workflow_item_anchors where item_id = ?1",
124            params![item_id],
125        )
126        .map_err(AppError::db_write)? as i64;
127    let cleared_derivations = conn
128        .execute(
129            "delete from item_derivations where item_id = ?1",
130            params![item_id],
131        )
132        .map_err(AppError::db_write)? as i64;
133    conn.execute(
134        "update inbox_items set raw_text = ?1 where item_id = ?2",
135        params![text, item_id],
136    )
137    .map_err(AppError::db_write)?;
138    conn.execute(
139        "delete from tags
140         where not exists (
141           select 1
142           from item_tags it
143           where it.tag_id = tags.tag_id
144         )",
145        [],
146    )
147    .map_err(AppError::db_write)?;
148
149    let updated_at: String = conn
150        .query_row(
151            "select updated_at from item_search_documents where item_id = ?1",
152            params![item_id],
153            |row| row.get(0),
154        )
155        .map_err(AppError::db_query)?;
156
157    Ok(UpdatedItem {
158        item_id,
159        updated_at,
160        text: text.to_string(),
161        cleared_derivations,
162        cleared_workflow_anchors: removed_workflow_anchors,
163    })
164}
165
166pub fn delete_item_hard(conn: &Connection, item_id: i64) -> Result<DeletedItem, AppError> {
167    let exists: i64 = conn
168        .query_row(
169            "select count(*) from inbox_items where item_id = ?1",
170            params![item_id],
171            |row| row.get(0),
172        )
173        .map_err(AppError::db_query)?;
174    if exists == 0 {
175        return Err(AppError::usage("item_id does not exist")
176            .with_code("invalid-item-id")
177            .with_details(serde_json::json!({ "item_id": item_id })));
178    }
179
180    let removed_workflow_anchors = conn
181        .execute(
182            "delete from workflow_item_anchors where item_id = ?1",
183            params![item_id],
184        )
185        .map_err(AppError::db_write)? as i64;
186    let removed_derivations = conn
187        .execute(
188            "delete from item_derivations where item_id = ?1",
189            params![item_id],
190        )
191        .map_err(AppError::db_write)? as i64;
192    conn.execute(
193        "delete from item_search_documents where item_id = ?1",
194        params![item_id],
195    )
196    .map_err(AppError::db_write)?;
197    conn.execute(
198        "delete from inbox_items where item_id = ?1",
199        params![item_id],
200    )
201    .map_err(AppError::db_write)?;
202    conn.execute(
203        "delete from tags
204         where not exists (
205           select 1
206           from item_tags it
207           where it.tag_id = tags.tag_id
208         )",
209        [],
210    )
211    .map_err(AppError::db_write)?;
212
213    let deleted_at: String = conn
214        .query_row("select strftime('%Y-%m-%dT%H:%M:%fZ', 'now')", [], |row| {
215            row.get(0)
216        })
217        .map_err(AppError::db_query)?;
218
219    Ok(DeletedItem {
220        item_id,
221        deleted_at,
222        removed_derivations,
223        removed_workflow_anchors,
224    })
225}
226
227pub fn list_items(
228    conn: &Connection,
229    state: QueryState,
230    limit: usize,
231    offset: usize,
232) -> Result<Vec<ListItem>, AppError> {
233    let state_filter = state_sql(state);
234    let sql = format!(
235        "select
236            i.item_id,
237            i.created_at,
238            case
239                when ad.derivation_id is not null then 'enriched'
240                else 'pending'
241            end as state,
242            substr(i.raw_text, 1, 80) as text_preview,
243            json_extract(ad.payload_json, '$.content_type') as content_type,
244            json_extract(ad.payload_json, '$.validation_status') as validation_status
245        from inbox_items i
246        left join item_derivations ad
247          on ad.derivation_id = (
248            select d.derivation_id
249            from item_derivations d
250            where d.item_id = i.item_id
251              and d.is_active = 1
252              and d.status = 'accepted'
253            order by d.derivation_version desc, d.derivation_id desc
254            limit 1
255          )
256        where {state_filter}
257        order by i.created_at desc, i.item_id desc
258        limit ?1 offset ?2"
259    );
260
261    let mut stmt = conn.prepare(&sql).map_err(AppError::db_query)?;
262    let rows = stmt
263        .query_map(params![limit as i64, offset as i64], |row| {
264            Ok(ListItem {
265                item_id: row.get(0)?,
266                created_at: row.get(1)?,
267                state: row.get(2)?,
268                text_preview: row.get(3)?,
269                content_type: row.get(4)?,
270                validation_status: row.get(5)?,
271            })
272        })
273        .map_err(AppError::db_query)?;
274
275    rows.collect::<Result<Vec<_>, _>>()
276        .map_err(AppError::db_query)
277}
278
279pub fn lookup_fetch_cursor(
280    conn: &Connection,
281    item_id: i64,
282) -> Result<Option<FetchCursor>, AppError> {
283    conn.query_row(
284        "select item_id, created_at from inbox_items where item_id = ?1",
285        params![item_id],
286        |row| {
287            Ok(FetchCursor {
288                item_id: row.get(0)?,
289                created_at: row.get(1)?,
290            })
291        },
292    )
293    .map(Some)
294    .or_else(|err| match err {
295        rusqlite::Error::QueryReturnedNoRows => Ok(None),
296        other => Err(AppError::db_query(other)),
297    })
298}
299
300pub fn fetch_pending_page(
301    conn: &Connection,
302    limit: usize,
303    cursor: Option<&FetchCursor>,
304) -> Result<Vec<FetchItem>, AppError> {
305    let mut stmt = conn
306        .prepare(
307            "select i.item_id, i.created_at, i.source, i.raw_text
308                    , null as content_type
309                    , null as validation_status
310            from inbox_items i
311            where not exists (
312                select 1 from item_derivations d
313                where d.item_id = i.item_id and d.is_active = 1 and d.status = 'accepted'
314            )
315              and (
316                ?1 is null
317                or i.created_at < ?2
318                or (i.created_at = ?2 and i.item_id < ?1)
319              )
320            order by i.created_at desc, i.item_id desc
321            limit ?3",
322        )
323        .map_err(AppError::db_query)?;
324
325    let cursor_item_id = cursor.map(|value| value.item_id);
326    let cursor_created_at = cursor.map(|value| value.created_at.as_str());
327
328    let rows = stmt
329        .query_map(
330            params![cursor_item_id, cursor_created_at, limit as i64],
331            |row| {
332                Ok(FetchItem {
333                    item_id: row.get(0)?,
334                    created_at: row.get(1)?,
335                    source: row.get(2)?,
336                    text: row.get(3)?,
337                    state: "pending".to_string(),
338                    content_type: row.get(4)?,
339                    validation_status: row.get(5)?,
340                })
341            },
342        )
343        .map_err(AppError::db_query)?;
344
345    rows.collect::<Result<Vec<_>, _>>()
346        .map_err(AppError::db_query)
347}
348
349fn state_sql(state: QueryState) -> &'static str {
350    match state {
351        QueryState::All => "1 = 1",
352        QueryState::Pending => {
353            "not exists (
354                select 1 from item_derivations d
355                where d.item_id = i.item_id and d.is_active = 1 and d.status = 'accepted'
356            )"
357        }
358        QueryState::Enriched => {
359            "exists (
360                select 1 from item_derivations d
361                where d.item_id = i.item_id and d.is_active = 1 and d.status = 'accepted'
362            )"
363        }
364    }
365}