Skip to main content

memo_cli/storage/
repository.rs

1use rusqlite::{Connection, params};
2use serde::Serialize;
3
4use crate::errors::AppError;
5
6const INBOX_ITEM_ALLOCATOR_NAME: &str = "inbox_items";
7
8#[derive(Debug, Clone, Copy, PartialEq, Eq)]
9pub enum QueryState {
10    All,
11    Pending,
12    Enriched,
13}
14
15#[derive(Debug, Clone, Serialize)]
16pub struct AddedItem {
17    pub item_id: i64,
18    pub created_at: String,
19    pub source: String,
20    pub text: String,
21}
22
23#[derive(Debug, Clone, Serialize)]
24pub struct UpdatedItem {
25    pub item_id: i64,
26    pub updated_at: String,
27    pub text: String,
28    pub cleared_derivations: i64,
29    pub cleared_workflow_anchors: i64,
30}
31
32#[derive(Debug, Clone, Serialize)]
33pub struct DeletedItem {
34    pub item_id: i64,
35    pub deleted_at: String,
36    pub removed_derivations: i64,
37    pub removed_workflow_anchors: i64,
38}
39
40#[derive(Debug, Clone, Serialize)]
41pub struct ListItem {
42    pub item_id: i64,
43    pub created_at: String,
44    pub state: String,
45    pub text_preview: String,
46    pub content_type: Option<String>,
47    pub validation_status: Option<String>,
48}
49
50#[derive(Debug, Clone, Serialize)]
51pub struct FetchItem {
52    pub item_id: i64,
53    pub created_at: String,
54    pub source: String,
55    pub text: String,
56    pub state: String,
57    pub content_type: Option<String>,
58    pub validation_status: Option<String>,
59}
60
61#[derive(Debug, Clone)]
62pub struct FetchCursor {
63    pub item_id: i64,
64    pub created_at: String,
65}
66
67fn ensure_item_allocator_seeded(conn: &Connection) -> Result<(), AppError> {
68    conn.execute(
69        "insert into id_allocators(name, last_id)
70         values (?1, coalesce((select max(item_id) from inbox_items), 0))
71         on conflict(name) do update
72         set last_id = max(id_allocators.last_id, excluded.last_id)",
73        params![INBOX_ITEM_ALLOCATOR_NAME],
74    )
75    .map_err(AppError::db_write)?;
76    Ok(())
77}
78
79fn allocate_next_item_id(conn: &Connection) -> Result<i64, AppError> {
80    ensure_item_allocator_seeded(conn)?;
81    conn.execute(
82        "update id_allocators
83         set last_id = last_id + 1
84         where name = ?1",
85        params![INBOX_ITEM_ALLOCATOR_NAME],
86    )
87    .map_err(AppError::db_write)?;
88
89    conn.query_row(
90        "select last_id from id_allocators where name = ?1",
91        params![INBOX_ITEM_ALLOCATOR_NAME],
92        |row| row.get(0),
93    )
94    .map_err(AppError::db_query)
95}
96
97pub fn add_item(
98    conn: &Connection,
99    text: &str,
100    source: &str,
101    created_at: Option<&str>,
102) -> Result<AddedItem, AppError> {
103    let item_id = allocate_next_item_id(conn)?;
104
105    if let Some(created_at) = created_at {
106        conn.execute(
107            "insert into inbox_items(item_id, source, raw_text, created_at)
108             values(?1, ?2, ?3, ?4)",
109            params![item_id, source, text, created_at],
110        )
111        .map_err(AppError::db_write)?;
112    } else {
113        conn.execute(
114            "insert into inbox_items(item_id, source, raw_text) values(?1, ?2, ?3)",
115            params![item_id, source, text],
116        )
117        .map_err(AppError::db_write)?;
118    }
119
120    let created_at: String = conn
121        .query_row(
122            "select created_at from inbox_items where item_id = ?1",
123            params![item_id],
124            |row| row.get(0),
125        )
126        .map_err(AppError::db_query)?;
127
128    Ok(AddedItem {
129        item_id,
130        created_at,
131        source: source.to_string(),
132        text: text.to_string(),
133    })
134}
135
136pub fn update_item(conn: &Connection, item_id: i64, text: &str) -> Result<UpdatedItem, AppError> {
137    let text = text.trim();
138    if text.is_empty() {
139        return Err(AppError::usage("update requires a non-empty text argument"));
140    }
141
142    let exists: i64 = conn
143        .query_row(
144            "select count(*) from inbox_items where item_id = ?1",
145            params![item_id],
146            |row| row.get(0),
147        )
148        .map_err(AppError::db_query)?;
149    if exists == 0 {
150        return Err(AppError::usage("item_id does not exist")
151            .with_code("invalid-item-id")
152            .with_details(serde_json::json!({ "item_id": item_id })));
153    }
154
155    let removed_workflow_anchors = conn
156        .execute(
157            "delete from workflow_item_anchors where item_id = ?1",
158            params![item_id],
159        )
160        .map_err(AppError::db_write)? as i64;
161    let cleared_derivations = conn
162        .execute(
163            "delete from item_derivations where item_id = ?1",
164            params![item_id],
165        )
166        .map_err(AppError::db_write)? as i64;
167    conn.execute(
168        "update inbox_items set raw_text = ?1 where item_id = ?2",
169        params![text, item_id],
170    )
171    .map_err(AppError::db_write)?;
172    conn.execute(
173        "delete from tags
174         where not exists (
175           select 1
176           from item_tags it
177           where it.tag_id = tags.tag_id
178         )",
179        [],
180    )
181    .map_err(AppError::db_write)?;
182
183    let updated_at: String = conn
184        .query_row(
185            "select updated_at from item_search_documents where item_id = ?1",
186            params![item_id],
187            |row| row.get(0),
188        )
189        .map_err(AppError::db_query)?;
190
191    Ok(UpdatedItem {
192        item_id,
193        updated_at,
194        text: text.to_string(),
195        cleared_derivations,
196        cleared_workflow_anchors: removed_workflow_anchors,
197    })
198}
199
200pub fn delete_item_hard(conn: &Connection, item_id: i64) -> Result<DeletedItem, AppError> {
201    let exists: i64 = conn
202        .query_row(
203            "select count(*) from inbox_items where item_id = ?1",
204            params![item_id],
205            |row| row.get(0),
206        )
207        .map_err(AppError::db_query)?;
208    if exists == 0 {
209        return Err(AppError::usage("item_id does not exist")
210            .with_code("invalid-item-id")
211            .with_details(serde_json::json!({ "item_id": item_id })));
212    }
213
214    let removed_workflow_anchors = conn
215        .execute(
216            "delete from workflow_item_anchors where item_id = ?1",
217            params![item_id],
218        )
219        .map_err(AppError::db_write)? as i64;
220    let removed_derivations = conn
221        .execute(
222            "delete from item_derivations where item_id = ?1",
223            params![item_id],
224        )
225        .map_err(AppError::db_write)? as i64;
226    conn.execute(
227        "delete from item_search_documents where item_id = ?1",
228        params![item_id],
229    )
230    .map_err(AppError::db_write)?;
231    conn.execute(
232        "delete from inbox_items where item_id = ?1",
233        params![item_id],
234    )
235    .map_err(AppError::db_write)?;
236    conn.execute(
237        "delete from tags
238         where not exists (
239           select 1
240           from item_tags it
241           where it.tag_id = tags.tag_id
242         )",
243        [],
244    )
245    .map_err(AppError::db_write)?;
246
247    let deleted_at: String = conn
248        .query_row("select strftime('%Y-%m-%dT%H:%M:%fZ', 'now')", [], |row| {
249            row.get(0)
250        })
251        .map_err(AppError::db_query)?;
252
253    Ok(DeletedItem {
254        item_id,
255        deleted_at,
256        removed_derivations,
257        removed_workflow_anchors,
258    })
259}
260
261pub fn list_items(
262    conn: &Connection,
263    state: QueryState,
264    limit: usize,
265    offset: usize,
266) -> Result<Vec<ListItem>, AppError> {
267    let state_filter = state_sql(state);
268    let sql = format!(
269        "select
270            i.item_id,
271            i.created_at,
272            case
273                when ad.derivation_id is not null then 'enriched'
274                else 'pending'
275            end as state,
276            substr(i.raw_text, 1, 80) as text_preview,
277            json_extract(ad.payload_json, '$.content_type') as content_type,
278            json_extract(ad.payload_json, '$.validation_status') as validation_status
279        from inbox_items i
280        left join item_derivations ad
281          on ad.derivation_id = (
282            select d.derivation_id
283            from item_derivations d
284            where d.item_id = i.item_id
285              and d.is_active = 1
286              and d.status = 'accepted'
287            order by d.derivation_version desc, d.derivation_id desc
288            limit 1
289          )
290        where {state_filter}
291        order by i.created_at desc, i.item_id desc
292        limit ?1 offset ?2"
293    );
294
295    let mut stmt = conn.prepare(&sql).map_err(AppError::db_query)?;
296    let rows = stmt
297        .query_map(params![limit as i64, offset as i64], |row| {
298            Ok(ListItem {
299                item_id: row.get(0)?,
300                created_at: row.get(1)?,
301                state: row.get(2)?,
302                text_preview: row.get(3)?,
303                content_type: row.get(4)?,
304                validation_status: row.get(5)?,
305            })
306        })
307        .map_err(AppError::db_query)?;
308
309    rows.collect::<Result<Vec<_>, _>>()
310        .map_err(AppError::db_query)
311}
312
313pub fn lookup_fetch_cursor(
314    conn: &Connection,
315    item_id: i64,
316) -> Result<Option<FetchCursor>, AppError> {
317    conn.query_row(
318        "select item_id, created_at from inbox_items where item_id = ?1",
319        params![item_id],
320        |row| {
321            Ok(FetchCursor {
322                item_id: row.get(0)?,
323                created_at: row.get(1)?,
324            })
325        },
326    )
327    .map(Some)
328    .or_else(|err| match err {
329        rusqlite::Error::QueryReturnedNoRows => Ok(None),
330        other => Err(AppError::db_query(other)),
331    })
332}
333
334pub fn fetch_pending_page(
335    conn: &Connection,
336    limit: usize,
337    cursor: Option<&FetchCursor>,
338) -> Result<Vec<FetchItem>, AppError> {
339    let mut stmt = conn
340        .prepare(
341            "select i.item_id, i.created_at, i.source, i.raw_text
342                    , null as content_type
343                    , null as validation_status
344            from inbox_items i
345            where not exists (
346                select 1 from item_derivations d
347                where d.item_id = i.item_id and d.is_active = 1 and d.status = 'accepted'
348            )
349              and (
350                ?1 is null
351                or i.created_at < ?2
352                or (i.created_at = ?2 and i.item_id < ?1)
353              )
354            order by i.created_at desc, i.item_id desc
355            limit ?3",
356        )
357        .map_err(AppError::db_query)?;
358
359    let cursor_item_id = cursor.map(|value| value.item_id);
360    let cursor_created_at = cursor.map(|value| value.created_at.as_str());
361
362    let rows = stmt
363        .query_map(
364            params![cursor_item_id, cursor_created_at, limit as i64],
365            |row| {
366                Ok(FetchItem {
367                    item_id: row.get(0)?,
368                    created_at: row.get(1)?,
369                    source: row.get(2)?,
370                    text: row.get(3)?,
371                    state: "pending".to_string(),
372                    content_type: row.get(4)?,
373                    validation_status: row.get(5)?,
374                })
375            },
376        )
377        .map_err(AppError::db_query)?;
378
379    rows.collect::<Result<Vec<_>, _>>()
380        .map_err(AppError::db_query)
381}
382
383fn state_sql(state: QueryState) -> &'static str {
384    match state {
385        QueryState::All => "1 = 1",
386        QueryState::Pending => {
387            "not exists (
388                select 1 from item_derivations d
389                where d.item_id = i.item_id and d.is_active = 1 and d.status = 'accepted'
390            )"
391        }
392        QueryState::Enriched => {
393            "exists (
394                select 1 from item_derivations d
395                where d.item_id = i.item_id and d.is_active = 1 and d.status = 'accepted'
396            )"
397        }
398    }
399}