Skip to main content

memo_cli/storage/
derivations.rs

1use std::collections::HashSet;
2
3use rusqlite::{Transaction, params};
4use serde::Serialize;
5
6use crate::errors::AppError;
7use crate::preprocess::{self, ValidationStatus};
8
9#[derive(Debug, Clone, Copy, PartialEq, Eq)]
10pub enum IncomingStatus {
11    Accepted,
12    Rejected,
13    Conflict,
14}
15
16impl IncomingStatus {
17    pub fn parse(value: &str) -> Option<Self> {
18        match value {
19            "accepted" => Some(Self::Accepted),
20            "rejected" => Some(Self::Rejected),
21            "conflict" => Some(Self::Conflict),
22            _ => None,
23        }
24    }
25}
26
27#[derive(Debug, Clone)]
28pub struct ApplyInputItem {
29    pub item_id: i64,
30    pub status: IncomingStatus,
31    pub derivation_hash: String,
32    pub base_derivation_id: Option<i64>,
33    pub summary: Option<String>,
34    pub category: Option<String>,
35    pub priority: Option<String>,
36    pub due_at: Option<String>,
37    pub normalized_text: Option<String>,
38    pub confidence: Option<f64>,
39    pub content_type: Option<String>,
40    pub validation_status: Option<String>,
41    pub validation_errors: Option<Vec<ApplyValidationError>>,
42    pub payload_json: serde_json::Value,
43    pub conflict_reason: Option<String>,
44    pub tags: Vec<String>,
45    pub agent_run_id: Option<String>,
46}
47
48#[derive(Debug, Clone, Serialize, PartialEq, Eq)]
49pub struct ApplyValidationError {
50    pub code: String,
51    pub message: String,
52    #[serde(skip_serializing_if = "Option::is_none")]
53    pub path: Option<String>,
54}
55
56#[derive(Debug, Clone, Serialize)]
57pub struct ApplyItemError {
58    pub code: String,
59    pub message: String,
60    #[serde(skip_serializing_if = "Option::is_none")]
61    pub details: Option<serde_json::Value>,
62}
63
64#[derive(Debug, Clone, Serialize)]
65pub struct ApplyItemOutcome {
66    pub item_id: i64,
67    pub status: String,
68    #[serde(skip_serializing_if = "Option::is_none")]
69    pub derivation_version: Option<i64>,
70    #[serde(skip_serializing_if = "Option::is_none")]
71    pub content_type: Option<String>,
72    #[serde(skip_serializing_if = "Option::is_none")]
73    pub validation_status: Option<String>,
74    #[serde(skip_serializing_if = "Option::is_none")]
75    pub validation_errors: Option<Vec<ApplyValidationError>>,
76    #[serde(skip_serializing_if = "Option::is_none")]
77    pub error: Option<ApplyItemError>,
78}
79
80impl ApplyItemOutcome {
81    pub fn content_type(&self) -> Option<&str> {
82        self.content_type.as_deref()
83    }
84
85    pub fn validation_status(&self) -> Option<&str> {
86        self.validation_status.as_deref()
87    }
88
89    pub fn validation_errors(&self) -> Option<&[ApplyValidationError]> {
90        self.validation_errors.as_deref()
91    }
92}
93
94#[derive(Debug, Clone, Serialize)]
95pub struct ApplySummary {
96    pub dry_run: bool,
97    pub processed: i64,
98    pub accepted: i64,
99    pub skipped: i64,
100    pub failed: i64,
101    pub items: Vec<ApplyItemOutcome>,
102}
103
104#[derive(Debug, Clone, Default)]
105struct ResolvedMetadata {
106    content_type: Option<String>,
107    validation_status: Option<String>,
108    validation_errors: Option<Vec<ApplyValidationError>>,
109}
110
111impl ResolvedMetadata {
112    fn from_item(item: &ApplyInputItem) -> Self {
113        Self {
114            content_type: item.content_type.clone(),
115            validation_status: item.validation_status.clone(),
116            validation_errors: item.validation_errors.clone(),
117        }
118    }
119}
120
121pub fn apply_items(
122    tx: &Transaction<'_>,
123    items: &[ApplyInputItem],
124    dry_run: bool,
125    default_agent_run_id: &str,
126) -> Result<ApplySummary, AppError> {
127    let mut accepted = 0_i64;
128    let mut skipped = 0_i64;
129    let mut failed = 0_i64;
130    let mut outcomes = Vec::with_capacity(items.len());
131
132    for item in items {
133        let mut metadata = ResolvedMetadata::from_item(item);
134
135        if !item_exists(tx, item.item_id)? {
136            failed += 1;
137            outcomes.push(build_outcome(
138                item.item_id,
139                "failed",
140                None,
141                &metadata,
142                Some(ApplyItemError {
143                    code: "invalid-apply-payload".to_string(),
144                    message: "item_id does not exist".to_string(),
145                    details: None,
146                }),
147            ));
148            continue;
149        }
150
151        populate_missing_metadata(tx, item.item_id, &mut metadata)?;
152
153        let active = current_active(tx, item.item_id)?;
154        if let Some(base_derivation_id) = item.base_derivation_id
155            && active.map(|row| row.0) != Some(base_derivation_id)
156        {
157            skipped += 1;
158            outcomes.push(build_outcome(
159                item.item_id,
160                "skipped",
161                None,
162                &metadata,
163                Some(ApplyItemError {
164                    code: "apply-item-conflict".to_string(),
165                    message: "incoming base derivation does not match active derivation"
166                        .to_string(),
167                    details: Some(serde_json::json!({
168                        "incoming_base_derivation_id": base_derivation_id,
169                        "active_derivation_id": active.map(|row| row.0),
170                    })),
171                }),
172            ));
173            continue;
174        }
175
176        if let Some(existing_version) =
177            derivation_version_by_hash(tx, item.item_id, &item.derivation_hash)?
178        {
179            skipped += 1;
180            outcomes.push(build_outcome(
181                item.item_id,
182                "skipped",
183                Some(existing_version),
184                &metadata,
185                None,
186            ));
187            continue;
188        }
189
190        if item.status == IncomingStatus::Conflict && item.conflict_reason.is_none() {
191            failed += 1;
192            outcomes.push(build_outcome(
193                item.item_id,
194                "failed",
195                None,
196                &metadata,
197                Some(ApplyItemError {
198                    code: "invalid-apply-payload".to_string(),
199                    message: "status=conflict requires conflict_reason".to_string(),
200                    details: None,
201                }),
202            ));
203            continue;
204        }
205
206        let next_version = next_derivation_version(tx, item.item_id)?;
207        if dry_run {
208            accepted += 1;
209            outcomes.push(build_outcome(
210                item.item_id,
211                "accepted",
212                Some(next_version),
213                &metadata,
214                None,
215            ));
216            continue;
217        }
218
219        if item.status == IncomingStatus::Accepted {
220            tx.execute(
221                "update item_derivations
222                 set is_active = 0
223                 where item_id = ?1 and is_active = 1 and status = 'accepted'",
224                params![item.item_id],
225            )
226            .map_err(AppError::db_write)?;
227        }
228
229        let payload_json = serde_json::to_string(&merge_payload_with_metadata(
230            item.payload_json.clone(),
231            &metadata,
232        ))
233        .map_err(|err| {
234            AppError::invalid_apply_payload(
235                format!(
236                    "payload serialization failed for item {}: {err}",
237                    item.item_id
238                ),
239                None,
240            )
241        })?;
242        let status_str = match item.status {
243            IncomingStatus::Accepted => "accepted",
244            IncomingStatus::Rejected => "rejected",
245            IncomingStatus::Conflict => "conflict",
246        };
247        let is_active = if item.status == IncomingStatus::Accepted {
248            1
249        } else {
250            0
251        };
252        let agent_run_id = item
253            .agent_run_id
254            .as_deref()
255            .filter(|value| !value.trim().is_empty())
256            .unwrap_or(default_agent_run_id);
257
258        tx.execute(
259            "insert into item_derivations(
260                item_id,
261                derivation_version,
262                status,
263                is_active,
264                base_derivation_id,
265                derivation_hash,
266                agent_run_id,
267                summary,
268                category,
269                priority,
270                due_at,
271                normalized_text,
272                confidence,
273                payload_json,
274                conflict_reason
275            ) values (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12, ?13, ?14, ?15)",
276            params![
277                item.item_id,
278                next_version,
279                status_str,
280                is_active,
281                item.base_derivation_id,
282                item.derivation_hash,
283                agent_run_id,
284                item.summary,
285                item.category,
286                item.priority,
287                item.due_at,
288                item.normalized_text,
289                item.confidence,
290                payload_json,
291                item.conflict_reason,
292            ],
293        )
294        .map_err(|err| {
295            if let rusqlite::Error::SqliteFailure(native, _) = &err
296                && native.extended_code == rusqlite::ffi::SQLITE_CONSTRAINT_UNIQUE
297            {
298                return AppError::runtime("duplicate derivation hash for item")
299                    .with_code("apply-item-conflict");
300            }
301            AppError::db_write(err)
302        })?;
303        let derivation_id = tx.last_insert_rowid();
304
305        if item.status == IncomingStatus::Accepted {
306            let tag_list = tags_with_metadata(&item.tags, &metadata);
307            attach_tags(tx, derivation_id, &tag_list)?;
308        }
309
310        accepted += 1;
311        outcomes.push(build_outcome(
312            item.item_id,
313            "accepted",
314            Some(next_version),
315            &metadata,
316            None,
317        ));
318    }
319
320    Ok(ApplySummary {
321        dry_run,
322        processed: items.len() as i64,
323        accepted,
324        skipped,
325        failed,
326        items: outcomes,
327    })
328}
329
330fn build_outcome(
331    item_id: i64,
332    status: &str,
333    derivation_version: Option<i64>,
334    metadata: &ResolvedMetadata,
335    error: Option<ApplyItemError>,
336) -> ApplyItemOutcome {
337    ApplyItemOutcome {
338        item_id,
339        status: status.to_string(),
340        derivation_version,
341        content_type: metadata.content_type.clone(),
342        validation_status: metadata.validation_status.clone(),
343        validation_errors: metadata.validation_errors.clone(),
344        error,
345    }
346}
347
348fn populate_missing_metadata(
349    tx: &Transaction<'_>,
350    item_id: i64,
351    metadata: &mut ResolvedMetadata,
352) -> Result<(), AppError> {
353    let needs_content_type = metadata.content_type.is_none();
354    let needs_status = metadata.validation_status.is_none();
355    let needs_invalid_errors = matches!(metadata.validation_status.as_deref(), Some("invalid"))
356        && metadata.validation_errors.is_none();
357
358    if !needs_content_type && !needs_status && !needs_invalid_errors {
359        return Ok(());
360    }
361
362    let raw_text: String = tx
363        .query_row(
364            "select raw_text from inbox_items where item_id = ?1",
365            params![item_id],
366            |row| row.get(0),
367        )
368        .map_err(AppError::db_query)?;
369    let analyzed = preprocess::analyze(&raw_text);
370
371    if needs_content_type {
372        metadata.content_type = Some(analyzed.content_type.as_str().to_string());
373    }
374    if needs_status {
375        metadata.validation_status = Some(analyzed.validation.status.as_str().to_string());
376    }
377
378    let invalid_from_analysis = matches!(analyzed.validation.status, ValidationStatus::Invalid);
379    if (metadata.validation_errors.is_none() && invalid_from_analysis) || needs_invalid_errors {
380        metadata.validation_errors = analyzed.validation.errors.map(|errors| {
381            errors
382                .into_iter()
383                .map(|err| ApplyValidationError {
384                    code: err.code,
385                    message: err.message,
386                    path: err.path,
387                })
388                .collect::<Vec<_>>()
389        });
390    }
391
392    Ok(())
393}
394
395fn tags_with_metadata(base_tags: &[String], metadata: &ResolvedMetadata) -> Vec<String> {
396    let mut tags = base_tags.to_vec();
397    if let Some(content_type) = &metadata.content_type {
398        tags.push(format!("fmt:{content_type}"));
399    }
400    if let Some(validation_status) = &metadata.validation_status {
401        tags.push(format!("val:{validation_status}"));
402    }
403    tags
404}
405
406fn merge_payload_with_metadata(
407    payload_json: serde_json::Value,
408    metadata: &ResolvedMetadata,
409) -> serde_json::Value {
410    let mut map = match payload_json {
411        serde_json::Value::Object(object) => object,
412        other => {
413            let mut object = serde_json::Map::new();
414            object.insert("payload".to_string(), other);
415            object
416        }
417    };
418
419    if let Some(content_type) = &metadata.content_type {
420        map.insert(
421            "content_type".to_string(),
422            serde_json::Value::String(content_type.clone()),
423        );
424    }
425    if let Some(validation_status) = &metadata.validation_status {
426        map.insert(
427            "validation_status".to_string(),
428            serde_json::Value::String(validation_status.clone()),
429        );
430    }
431    if let Some(validation_errors) = &metadata.validation_errors
432        && let Ok(encoded) = serde_json::to_value(validation_errors)
433    {
434        map.insert("validation_errors".to_string(), encoded);
435    }
436
437    serde_json::Value::Object(map)
438}
439
440fn item_exists(tx: &Transaction<'_>, item_id: i64) -> Result<bool, AppError> {
441    let count: i64 = tx
442        .query_row(
443            "select count(*) from inbox_items where item_id = ?1",
444            params![item_id],
445            |row| row.get(0),
446        )
447        .map_err(AppError::db_query)?;
448    Ok(count > 0)
449}
450
451fn current_active(tx: &Transaction<'_>, item_id: i64) -> Result<Option<(i64, i64)>, AppError> {
452    tx.query_row(
453        "select derivation_id, derivation_version
454         from item_derivations
455         where item_id = ?1 and is_active = 1 and status = 'accepted'
456         order by derivation_version desc, derivation_id desc
457         limit 1",
458        params![item_id],
459        |row| Ok((row.get(0)?, row.get(1)?)),
460    )
461    .map(Some)
462    .or_else(|err| match err {
463        rusqlite::Error::QueryReturnedNoRows => Ok(None),
464        other => Err(AppError::db_query(other)),
465    })
466}
467
468fn derivation_version_by_hash(
469    tx: &Transaction<'_>,
470    item_id: i64,
471    derivation_hash: &str,
472) -> Result<Option<i64>, AppError> {
473    tx.query_row(
474        "select derivation_version
475         from item_derivations
476         where item_id = ?1 and derivation_hash = ?2
477         limit 1",
478        params![item_id, derivation_hash],
479        |row| row.get(0),
480    )
481    .map(Some)
482    .or_else(|err| match err {
483        rusqlite::Error::QueryReturnedNoRows => Ok(None),
484        other => Err(AppError::db_query(other)),
485    })
486}
487
488fn next_derivation_version(tx: &Transaction<'_>, item_id: i64) -> Result<i64, AppError> {
489    let next_version = tx
490        .query_row(
491            "select coalesce(max(derivation_version), 0) + 1
492             from item_derivations
493             where item_id = ?1",
494            params![item_id],
495            |row| row.get(0),
496        )
497        .map_err(AppError::db_query)?;
498    Ok(next_version)
499}
500
501fn attach_tags(tx: &Transaction<'_>, derivation_id: i64, tags: &[String]) -> Result<(), AppError> {
502    let mut seen = HashSet::new();
503    for tag in tags {
504        let trimmed = tag.trim();
505        if trimmed.is_empty() {
506            continue;
507        }
508        let normalized = trimmed.to_lowercase();
509        if !seen.insert(normalized.clone()) {
510            continue;
511        }
512
513        tx.execute(
514            "insert into tags(tag_name, tag_name_norm)
515             values(?1, ?2)
516             on conflict(tag_name_norm) do update set tag_name = excluded.tag_name",
517            params![trimmed, normalized],
518        )
519        .map_err(AppError::db_write)?;
520
521        let tag_id: i64 = tx
522            .query_row(
523                "select tag_id from tags where tag_name_norm = ?1",
524                params![normalized],
525                |row| row.get(0),
526            )
527            .map_err(AppError::db_query)?;
528
529        tx.execute(
530            "insert or ignore into item_tags(derivation_id, tag_id) values (?1, ?2)",
531            params![derivation_id, tag_id],
532        )
533        .map_err(AppError::db_write)?;
534    }
535
536    Ok(())
537}