1use std::collections::HashSet;
2
3use rusqlite::{Transaction, params};
4use serde::Serialize;
5
6use crate::errors::AppError;
7use crate::preprocess::{self, ValidationStatus};
8
9#[derive(Debug, Clone, Copy, PartialEq, Eq)]
10pub enum IncomingStatus {
11 Accepted,
12 Rejected,
13 Conflict,
14}
15
16impl IncomingStatus {
17 pub fn parse(value: &str) -> Option<Self> {
18 match value {
19 "accepted" => Some(Self::Accepted),
20 "rejected" => Some(Self::Rejected),
21 "conflict" => Some(Self::Conflict),
22 _ => None,
23 }
24 }
25}
26
27#[derive(Debug, Clone)]
28pub struct ApplyInputItem {
29 pub item_id: i64,
30 pub status: IncomingStatus,
31 pub derivation_hash: String,
32 pub base_derivation_id: Option<i64>,
33 pub summary: Option<String>,
34 pub category: Option<String>,
35 pub priority: Option<String>,
36 pub due_at: Option<String>,
37 pub normalized_text: Option<String>,
38 pub confidence: Option<f64>,
39 pub content_type: Option<String>,
40 pub validation_status: Option<String>,
41 pub validation_errors: Option<Vec<ApplyValidationError>>,
42 pub payload_json: serde_json::Value,
43 pub conflict_reason: Option<String>,
44 pub tags: Vec<String>,
45 pub agent_run_id: Option<String>,
46}
47
48#[derive(Debug, Clone, Serialize, PartialEq, Eq)]
49pub struct ApplyValidationError {
50 pub code: String,
51 pub message: String,
52 #[serde(skip_serializing_if = "Option::is_none")]
53 pub path: Option<String>,
54}
55
56#[derive(Debug, Clone, Serialize)]
57pub struct ApplyItemError {
58 pub code: String,
59 pub message: String,
60 #[serde(skip_serializing_if = "Option::is_none")]
61 pub details: Option<serde_json::Value>,
62}
63
64#[derive(Debug, Clone, Serialize)]
65pub struct ApplyItemOutcome {
66 pub item_id: i64,
67 pub status: String,
68 #[serde(skip_serializing_if = "Option::is_none")]
69 pub derivation_version: Option<i64>,
70 #[serde(skip_serializing_if = "Option::is_none")]
71 pub content_type: Option<String>,
72 #[serde(skip_serializing_if = "Option::is_none")]
73 pub validation_status: Option<String>,
74 #[serde(skip_serializing_if = "Option::is_none")]
75 pub validation_errors: Option<Vec<ApplyValidationError>>,
76 #[serde(skip_serializing_if = "Option::is_none")]
77 pub error: Option<ApplyItemError>,
78}
79
80impl ApplyItemOutcome {
81 pub fn content_type(&self) -> Option<&str> {
82 self.content_type.as_deref()
83 }
84
85 pub fn validation_status(&self) -> Option<&str> {
86 self.validation_status.as_deref()
87 }
88
89 pub fn validation_errors(&self) -> Option<&[ApplyValidationError]> {
90 self.validation_errors.as_deref()
91 }
92}
93
94#[derive(Debug, Clone, Serialize)]
95pub struct ApplySummary {
96 pub dry_run: bool,
97 pub processed: i64,
98 pub accepted: i64,
99 pub skipped: i64,
100 pub failed: i64,
101 pub items: Vec<ApplyItemOutcome>,
102}
103
104#[derive(Debug, Clone, Default)]
105struct ResolvedMetadata {
106 content_type: Option<String>,
107 validation_status: Option<String>,
108 validation_errors: Option<Vec<ApplyValidationError>>,
109}
110
111impl ResolvedMetadata {
112 fn from_item(item: &ApplyInputItem) -> Self {
113 Self {
114 content_type: item.content_type.clone(),
115 validation_status: item.validation_status.clone(),
116 validation_errors: item.validation_errors.clone(),
117 }
118 }
119}
120
121pub fn apply_items(
122 tx: &Transaction<'_>,
123 items: &[ApplyInputItem],
124 dry_run: bool,
125 default_agent_run_id: &str,
126) -> Result<ApplySummary, AppError> {
127 let mut accepted = 0_i64;
128 let mut skipped = 0_i64;
129 let mut failed = 0_i64;
130 let mut outcomes = Vec::with_capacity(items.len());
131
132 for item in items {
133 let mut metadata = ResolvedMetadata::from_item(item);
134
135 if !item_exists(tx, item.item_id)? {
136 failed += 1;
137 outcomes.push(build_outcome(
138 item.item_id,
139 "failed",
140 None,
141 &metadata,
142 Some(ApplyItemError {
143 code: "invalid-apply-payload".to_string(),
144 message: "item_id does not exist".to_string(),
145 details: None,
146 }),
147 ));
148 continue;
149 }
150
151 populate_missing_metadata(tx, item.item_id, &mut metadata)?;
152
153 let active = current_active(tx, item.item_id)?;
154 if let Some(base_derivation_id) = item.base_derivation_id
155 && active.map(|row| row.0) != Some(base_derivation_id)
156 {
157 skipped += 1;
158 outcomes.push(build_outcome(
159 item.item_id,
160 "skipped",
161 None,
162 &metadata,
163 Some(ApplyItemError {
164 code: "apply-item-conflict".to_string(),
165 message: "incoming base derivation does not match active derivation"
166 .to_string(),
167 details: Some(serde_json::json!({
168 "incoming_base_derivation_id": base_derivation_id,
169 "active_derivation_id": active.map(|row| row.0),
170 })),
171 }),
172 ));
173 continue;
174 }
175
176 if let Some(existing_version) =
177 derivation_version_by_hash(tx, item.item_id, &item.derivation_hash)?
178 {
179 skipped += 1;
180 outcomes.push(build_outcome(
181 item.item_id,
182 "skipped",
183 Some(existing_version),
184 &metadata,
185 None,
186 ));
187 continue;
188 }
189
190 if item.status == IncomingStatus::Conflict && item.conflict_reason.is_none() {
191 failed += 1;
192 outcomes.push(build_outcome(
193 item.item_id,
194 "failed",
195 None,
196 &metadata,
197 Some(ApplyItemError {
198 code: "invalid-apply-payload".to_string(),
199 message: "status=conflict requires conflict_reason".to_string(),
200 details: None,
201 }),
202 ));
203 continue;
204 }
205
206 let next_version = next_derivation_version(tx, item.item_id)?;
207 if dry_run {
208 accepted += 1;
209 outcomes.push(build_outcome(
210 item.item_id,
211 "accepted",
212 Some(next_version),
213 &metadata,
214 None,
215 ));
216 continue;
217 }
218
219 if item.status == IncomingStatus::Accepted {
220 tx.execute(
221 "update item_derivations
222 set is_active = 0
223 where item_id = ?1 and is_active = 1 and status = 'accepted'",
224 params![item.item_id],
225 )
226 .map_err(AppError::db_write)?;
227 }
228
229 let payload_json = serde_json::to_string(&merge_payload_with_metadata(
230 item.payload_json.clone(),
231 &metadata,
232 ))
233 .map_err(|err| {
234 AppError::invalid_apply_payload(
235 format!(
236 "payload serialization failed for item {}: {err}",
237 item.item_id
238 ),
239 None,
240 )
241 })?;
242 let status_str = match item.status {
243 IncomingStatus::Accepted => "accepted",
244 IncomingStatus::Rejected => "rejected",
245 IncomingStatus::Conflict => "conflict",
246 };
247 let is_active = if item.status == IncomingStatus::Accepted {
248 1
249 } else {
250 0
251 };
252 let agent_run_id = item
253 .agent_run_id
254 .as_deref()
255 .filter(|value| !value.trim().is_empty())
256 .unwrap_or(default_agent_run_id);
257
258 tx.execute(
259 "insert into item_derivations(
260 item_id,
261 derivation_version,
262 status,
263 is_active,
264 base_derivation_id,
265 derivation_hash,
266 agent_run_id,
267 summary,
268 category,
269 priority,
270 due_at,
271 normalized_text,
272 confidence,
273 payload_json,
274 conflict_reason
275 ) values (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12, ?13, ?14, ?15)",
276 params![
277 item.item_id,
278 next_version,
279 status_str,
280 is_active,
281 item.base_derivation_id,
282 item.derivation_hash,
283 agent_run_id,
284 item.summary,
285 item.category,
286 item.priority,
287 item.due_at,
288 item.normalized_text,
289 item.confidence,
290 payload_json,
291 item.conflict_reason,
292 ],
293 )
294 .map_err(|err| {
295 if let rusqlite::Error::SqliteFailure(native, _) = &err
296 && native.extended_code == rusqlite::ffi::SQLITE_CONSTRAINT_UNIQUE
297 {
298 return AppError::runtime("duplicate derivation hash for item")
299 .with_code("apply-item-conflict");
300 }
301 AppError::db_write(err)
302 })?;
303 let derivation_id = tx.last_insert_rowid();
304
305 if item.status == IncomingStatus::Accepted {
306 let tag_list = tags_with_metadata(&item.tags, &metadata);
307 attach_tags(tx, derivation_id, &tag_list)?;
308 }
309
310 accepted += 1;
311 outcomes.push(build_outcome(
312 item.item_id,
313 "accepted",
314 Some(next_version),
315 &metadata,
316 None,
317 ));
318 }
319
320 Ok(ApplySummary {
321 dry_run,
322 processed: items.len() as i64,
323 accepted,
324 skipped,
325 failed,
326 items: outcomes,
327 })
328}
329
330fn build_outcome(
331 item_id: i64,
332 status: &str,
333 derivation_version: Option<i64>,
334 metadata: &ResolvedMetadata,
335 error: Option<ApplyItemError>,
336) -> ApplyItemOutcome {
337 ApplyItemOutcome {
338 item_id,
339 status: status.to_string(),
340 derivation_version,
341 content_type: metadata.content_type.clone(),
342 validation_status: metadata.validation_status.clone(),
343 validation_errors: metadata.validation_errors.clone(),
344 error,
345 }
346}
347
348fn populate_missing_metadata(
349 tx: &Transaction<'_>,
350 item_id: i64,
351 metadata: &mut ResolvedMetadata,
352) -> Result<(), AppError> {
353 let needs_content_type = metadata.content_type.is_none();
354 let needs_status = metadata.validation_status.is_none();
355 let needs_invalid_errors = matches!(metadata.validation_status.as_deref(), Some("invalid"))
356 && metadata.validation_errors.is_none();
357
358 if !needs_content_type && !needs_status && !needs_invalid_errors {
359 return Ok(());
360 }
361
362 let raw_text: String = tx
363 .query_row(
364 "select raw_text from inbox_items where item_id = ?1",
365 params![item_id],
366 |row| row.get(0),
367 )
368 .map_err(AppError::db_query)?;
369 let analyzed = preprocess::analyze(&raw_text);
370
371 if needs_content_type {
372 metadata.content_type = Some(analyzed.content_type.as_str().to_string());
373 }
374 if needs_status {
375 metadata.validation_status = Some(analyzed.validation.status.as_str().to_string());
376 }
377
378 let invalid_from_analysis = matches!(analyzed.validation.status, ValidationStatus::Invalid);
379 if (metadata.validation_errors.is_none() && invalid_from_analysis) || needs_invalid_errors {
380 metadata.validation_errors = analyzed.validation.errors.map(|errors| {
381 errors
382 .into_iter()
383 .map(|err| ApplyValidationError {
384 code: err.code,
385 message: err.message,
386 path: err.path,
387 })
388 .collect::<Vec<_>>()
389 });
390 }
391
392 Ok(())
393}
394
395fn tags_with_metadata(base_tags: &[String], metadata: &ResolvedMetadata) -> Vec<String> {
396 let mut tags = base_tags.to_vec();
397 if let Some(content_type) = &metadata.content_type {
398 tags.push(format!("fmt:{content_type}"));
399 }
400 if let Some(validation_status) = &metadata.validation_status {
401 tags.push(format!("val:{validation_status}"));
402 }
403 tags
404}
405
406fn merge_payload_with_metadata(
407 payload_json: serde_json::Value,
408 metadata: &ResolvedMetadata,
409) -> serde_json::Value {
410 let mut map = match payload_json {
411 serde_json::Value::Object(object) => object,
412 other => {
413 let mut object = serde_json::Map::new();
414 object.insert("payload".to_string(), other);
415 object
416 }
417 };
418
419 if let Some(content_type) = &metadata.content_type {
420 map.insert(
421 "content_type".to_string(),
422 serde_json::Value::String(content_type.clone()),
423 );
424 }
425 if let Some(validation_status) = &metadata.validation_status {
426 map.insert(
427 "validation_status".to_string(),
428 serde_json::Value::String(validation_status.clone()),
429 );
430 }
431 if let Some(validation_errors) = &metadata.validation_errors
432 && let Ok(encoded) = serde_json::to_value(validation_errors)
433 {
434 map.insert("validation_errors".to_string(), encoded);
435 }
436
437 serde_json::Value::Object(map)
438}
439
440fn item_exists(tx: &Transaction<'_>, item_id: i64) -> Result<bool, AppError> {
441 let count: i64 = tx
442 .query_row(
443 "select count(*) from inbox_items where item_id = ?1",
444 params![item_id],
445 |row| row.get(0),
446 )
447 .map_err(AppError::db_query)?;
448 Ok(count > 0)
449}
450
451fn current_active(tx: &Transaction<'_>, item_id: i64) -> Result<Option<(i64, i64)>, AppError> {
452 tx.query_row(
453 "select derivation_id, derivation_version
454 from item_derivations
455 where item_id = ?1 and is_active = 1 and status = 'accepted'
456 order by derivation_version desc, derivation_id desc
457 limit 1",
458 params![item_id],
459 |row| Ok((row.get(0)?, row.get(1)?)),
460 )
461 .map(Some)
462 .or_else(|err| match err {
463 rusqlite::Error::QueryReturnedNoRows => Ok(None),
464 other => Err(AppError::db_query(other)),
465 })
466}
467
468fn derivation_version_by_hash(
469 tx: &Transaction<'_>,
470 item_id: i64,
471 derivation_hash: &str,
472) -> Result<Option<i64>, AppError> {
473 tx.query_row(
474 "select derivation_version
475 from item_derivations
476 where item_id = ?1 and derivation_hash = ?2
477 limit 1",
478 params![item_id, derivation_hash],
479 |row| row.get(0),
480 )
481 .map(Some)
482 .or_else(|err| match err {
483 rusqlite::Error::QueryReturnedNoRows => Ok(None),
484 other => Err(AppError::db_query(other)),
485 })
486}
487
488fn next_derivation_version(tx: &Transaction<'_>, item_id: i64) -> Result<i64, AppError> {
489 let next_version = tx
490 .query_row(
491 "select coalesce(max(derivation_version), 0) + 1
492 from item_derivations
493 where item_id = ?1",
494 params![item_id],
495 |row| row.get(0),
496 )
497 .map_err(AppError::db_query)?;
498 Ok(next_version)
499}
500
501fn attach_tags(tx: &Transaction<'_>, derivation_id: i64, tags: &[String]) -> Result<(), AppError> {
502 let mut seen = HashSet::new();
503 for tag in tags {
504 let trimmed = tag.trim();
505 if trimmed.is_empty() {
506 continue;
507 }
508 let normalized = trimmed.to_lowercase();
509 if !seen.insert(normalized.clone()) {
510 continue;
511 }
512
513 tx.execute(
514 "insert into tags(tag_name, tag_name_norm)
515 values(?1, ?2)
516 on conflict(tag_name_norm) do update set tag_name = excluded.tag_name",
517 params![trimmed, normalized],
518 )
519 .map_err(AppError::db_write)?;
520
521 let tag_id: i64 = tx
522 .query_row(
523 "select tag_id from tags where tag_name_norm = ?1",
524 params![normalized],
525 |row| row.get(0),
526 )
527 .map_err(AppError::db_query)?;
528
529 tx.execute(
530 "insert or ignore into item_tags(derivation_id, tag_id) values (?1, ?2)",
531 params![derivation_id, tag_id],
532 )
533 .map_err(AppError::db_write)?;
534 }
535
536 Ok(())
537}