canon-archive 0.2.2

A CLI tool for organizing large media libraries into a canonical archive
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
use anyhow::{Context, Result};
use serde::Deserialize;
use serde_json::Value;
use std::collections::HashMap;
use std::io::{self, BufRead};
use std::time::{SystemTime, UNIX_EPOCH};

use crate::domain::fact::{is_content_fact, normalize_fact_key, FactValueType};
use crate::repo::{self, Connection, Db};

#[derive(Deserialize)]
#[serde(deny_unknown_fields)]
#[allow(dead_code)] // Worklist pass-through fields are intentionally accepted but not read
struct FactImport {
    source_id: i64,
    basis_rev: i64,
    #[serde(default = "current_timestamp")]
    observed_at: i64,
    facts: HashMap<String, Value>,
    // Optional fields from worklist pass-through (allowed but ignored)
    #[serde(default)]
    path: Option<String>,
    #[serde(default)]
    root_id: Option<i64>,
    #[serde(default)]
    size: Option<i64>,
    #[serde(default)]
    mtime: Option<i64>,
}

fn current_timestamp() -> i64 {
    SystemTime::now()
        .duration_since(UNIX_EPOCH)
        .expect("Time went backwards")
        .as_secs() as i64
}

#[derive(Default)]
struct ImportStats {
    lines_processed: u64,
    facts_imported: u64,
    skipped_stale: u64,
    skipped_reserved: u64,
    skipped_archived: u64,
    skipped_type_mismatch: u64,
    objects_created: u64,
    facts_promoted: u64,
}

/// A fact value with optional type hint
enum TypedValue {
    Plain(Value),
    Hinted { value: Value, type_hint: String },
}

impl TypedValue {
    fn parse(v: &Value) -> Self {
        if let Value::Object(obj) = v {
            if let (Some(value), Some(Value::String(type_hint))) =
                (obj.get("value"), obj.get("type"))
            {
                return TypedValue::Hinted {
                    value: value.clone(),
                    type_hint: type_hint.clone(),
                };
            }
        }
        TypedValue::Plain(v.clone())
    }
}

/// Build a map of known fact keys to their established types
fn build_fact_type_map(conn: &Connection) -> Result<HashMap<String, FactValueType>> {
    repo::fact::fetch_type_map(conn)
}

pub fn run(db: &mut Db, allow_archived: bool, verbose: bool) -> Result<()> {
    let conn = db.conn_mut();
    let stdin = io::stdin();
    let mut stats = ImportStats::default();

    // Build type map once at start for efficient type checking
    // This map is updated during import to catch mixed types in the input stream
    let mut fact_type_map = build_fact_type_map(conn)?;

    // Track which keys had type mismatches (for summary)
    let mut type_mismatch_keys: HashMap<String, (FactValueType, FactValueType)> = HashMap::new();

    for line in stdin.lock().lines() {
        let line = line.context("Failed to read line from stdin")?;
        if line.trim().is_empty() {
            continue;
        }

        stats.lines_processed += 1;

        let import: FactImport = match serde_json::from_str(&line) {
            Ok(i) => i,
            Err(e) => {
                eprintln!(
                    "Warning: Failed to parse line {}: {}",
                    stats.lines_processed, e
                );
                continue;
            }
        };

        match process_import(
            conn,
            &import,
            &mut stats,
            &mut fact_type_map,
            &mut type_mismatch_keys,
            allow_archived,
            verbose,
        ) {
            Ok(_) => {}
            Err(e) => {
                eprintln!(
                    "Warning: Failed to process source_id {}: {}",
                    import.source_id, e
                );
            }
        }
    }

    // Print type mismatch warnings with remediation hint
    if !type_mismatch_keys.is_empty() {
        eprintln!("\nType mismatch warnings:");
        let mut keys: Vec<_> = type_mismatch_keys.iter().collect();
        keys.sort_by_key(|(k, _)| *k);
        for (key, (existing, attempted)) in keys {
            eprintln!(
                "  {key}: existing type is {existing}, attempted to import {attempted}"
            );
        }
        eprintln!("\nTo change the type, first delete existing facts:");
        eprintln!("  canon facts delete --key <key>");
        eprintln!("Then re-import with the new type.");
    }

    println!(
        "Processed {} lines: {} facts imported, {} skipped (stale), {} skipped (reserved), {} skipped (archived), {} skipped (type mismatch), {} objects created, {} facts promoted",
        stats.lines_processed,
        stats.facts_imported,
        stats.skipped_stale,
        stats.skipped_reserved,
        stats.skipped_archived,
        stats.skipped_type_mismatch,
        stats.objects_created,
        stats.facts_promoted
    );

    // Update query planner statistics after bulk changes
    db.run_analyze()?;

    Ok(())
}

fn process_import(
    conn: &mut Connection,
    import: &FactImport,
    stats: &mut ImportStats,
    fact_type_map: &mut HashMap<String, FactValueType>,
    type_mismatch_keys: &mut HashMap<String, (FactValueType, FactValueType)>,
    allow_archived: bool,
    verbose: bool,
) -> Result<()> {
    // Check if source exists and get its data
    let source = match repo::source::fetch_by_id(conn, import.source_id)? {
        Some(s) => s,
        None => {
            eprintln!("Warning: source_id {} not found", import.source_id);
            return Ok(());
        }
    };

    let current_basis_rev = source.basis_rev;
    let current_object_id = source.object_id;
    let root_path = &source.root_path;
    let rel_path = &source.rel_path;

    // Check if source is in an archive root
    if source.is_from_role("archive") && !allow_archived {
        stats.skipped_archived += 1;
        return Ok(());
    }

    if current_basis_rev != import.basis_rev {
        eprintln!(
            "Warning: source_id {} has basis_rev {} but import has {}, skipping",
            import.source_id, current_basis_rev, import.basis_rev
        );
        stats.skipped_stale += 1;
        return Ok(());
    }

    // Normalize all fact keys first, collecting valid ones
    let mut normalized_facts: Vec<(String, &Value)> = Vec::new();
    for (key, value) in &import.facts {
        match normalize_fact_key(key) {
            Ok(normalized_key) => normalized_facts.push((normalized_key, value)),
            Err(msg) => {
                eprintln!("Warning: skipping fact '{key}': {msg}");
                stats.skipped_reserved += 1;
            }
        }
    }

    // Check for content hash and process it first
    // Support both old format (hash.sha256) and new format (content.hash.sha256)
    let mut object_id = current_object_id;
    let hash_value = normalized_facts
        .iter()
        .find(|(k, _)| k == "content.hash.sha256")
        .map(|(_, v)| *v);

    if let Some(hash_val) = hash_value {
        if let Some(hash_str) = hash_val.as_str() {
            let obj = repo::object::get_or_create(conn, "sha256", hash_str)?;
            // Check if this was a new object (not previously linked to this source)
            if current_object_id.is_none() || current_object_id != Some(obj.id) {
                stats.objects_created += 1;
            }
            object_id = Some(obj.id);

            // Link source to object AND promote existing facts in a single transaction
            // This ensures atomicity: either both succeed or neither does
            if current_object_id.is_none() {
                let tx = conn.transaction()?;
                repo::source::set_object_id(&tx, import.source_id, obj.id)?;
                let promoted = promote_content_facts(&tx, import.source_id, obj.id)?;
                tx.commit()?;
                stats.facts_promoted += promoted;
            } else if current_object_id != object_id {
                // Just re-linking to a different object (rare case)
                repo::source::set_object_id(conn, import.source_id, obj.id)?;
            }
        }
    }

    // Import facts - all imported facts are content facts (stored on object when available)
    if verbose && !normalized_facts.is_empty() {
        eprintln!("[{root_path}] {rel_path}");
    }
    for (key, value) in &normalized_facts {
        // Parse as typed value (plain or with hint)
        let typed = TypedValue::parse(value);

        // Check type consistency
        let new_type = match get_typed_value_type(&typed) {
            Some(t) => t,
            None => {
                // Type hint parsing failed - classify_typed_value will give details
                if let Err(e) = classify_typed_value(&typed) {
                    eprintln!("Warning: {e} for '{key}' in {root_path}/{rel_path}");
                }
                stats.skipped_type_mismatch += 1;
                continue;
            }
        };

        if let Some(&existing_type) = fact_type_map.get(key) {
            if existing_type != new_type {
                // Type mismatch - skip this fact
                let is_new_key = !type_mismatch_keys.contains_key(key);
                type_mismatch_keys
                    .entry(key.clone())
                    .or_insert((existing_type, new_type));
                if is_new_key {
                    // First occurrence of this mismatch - show the path and value
                    eprintln!(
                        "Warning: type mismatch for '{key}' in {root_path}/{rel_path}: existing {existing_type}, got {new_type} (value: {value})"
                    );
                }
                stats.skipped_type_mismatch += 1;
                continue;
            }
        } else {
            // New key - register its type for subsequent records in this import session
            fact_type_map.insert(key.clone(), new_type);
        }

        // Classify for storage
        let (value_text, value_num, value_time) = match classify_typed_value(&typed) {
            Ok(v) => v,
            Err(e) => {
                eprintln!("Warning: {e}");
                stats.skipped_type_mismatch += 1;
                continue;
            }
        };

        if object_id.is_some() {
            if verbose {
                eprintln!("  {key}: {value} (on object)");
            }
            // Store as object fact
            repo::fact::upsert(
                conn,
                "object",
                object_id.unwrap(),
                key,
                value_text.as_deref(),
                value_num,
                value_time,
                import.observed_at,
                None, // object facts don't have observed_basis_rev
            )?;
            stats.facts_imported += 1;
            stats.facts_promoted += 1;
        } else {
            if verbose {
                eprintln!("  {key}: {value} (on source)");
            }
            // Store as source fact for now (will be promoted later when hash is known)
            repo::fact::upsert(
                conn,
                "source",
                import.source_id,
                key,
                value_text.as_deref(),
                value_num,
                value_time,
                import.observed_at,
                Some(import.basis_rev),
            )?;
            stats.facts_imported += 1;
        }
    }

    Ok(())
}

/// Try to parse a string as a datetime, returning the Unix timestamp if successful
fn try_parse_datetime(s: &str) -> Option<i64> {
    // RFC3339 format (2020-01-15T10:30:00+00:00)
    if let Ok(dt) = chrono::DateTime::parse_from_rfc3339(s) {
        return Some(dt.timestamp());
    }
    // ISO format without timezone (2020-01-15T10:30:00)
    if let Ok(dt) = chrono::NaiveDateTime::parse_from_str(s, "%Y-%m-%dT%H:%M:%S") {
        return Some(dt.and_utc().timestamp());
    }

    // EXIF format uses colons in date: "2020:07:23 11:06:32"
    // Normalize to ISO format: "2020-07-23T11:06:32" then parse
    if s.len() >= 19 && s.chars().nth(4) == Some(':') && s.chars().nth(7) == Some(':') {
        // Convert "2020:07:23 11:06:32..." to "2020-07-23T11:06:32..."
        let iso: String = s
            .chars()
            .enumerate()
            .map(|(i, c)| match i {
                4 | 7 => '-',          // date colons → dashes
                10 if c == ' ' => 'T', // space → T
                _ => c,
            })
            .collect();

        // Try with timezone (handles subseconds too: 2020-07-23T11:06:32.023+02:00)
        if let Ok(dt) = chrono::DateTime::parse_from_rfc3339(&iso) {
            return Some(dt.timestamp());
        }
        // Try with Z suffix
        if let Ok(dt) = chrono::DateTime::parse_from_rfc3339(&format!("{}Z", &iso)) {
            return Some(dt.timestamp());
        }
        // Try without timezone (assume UTC)
        if let Ok(dt) = chrono::NaiveDateTime::parse_from_str(&iso[..19], "%Y-%m-%dT%H:%M:%S") {
            return Some(dt.and_utc().timestamp());
        }
    }

    // Plain 4-digit year string (e.g., "2005") - Jan 1 midnight UTC
    if s.len() == 4 {
        if let Ok(year) = s.parse::<i32>() {
            return year_to_timestamp(year);
        }
    }

    None
}

/// Convert a year to a Unix timestamp (Jan 1 midnight UTC)
fn year_to_timestamp(year: i32) -> Option<i64> {
    use chrono::TimeZone;
    // Sanity check for reasonable year range
    if (1900..=2100).contains(&year) {
        chrono::Utc
            .with_ymd_and_hms(year, 1, 1, 0, 0, 0)
            .single()
            .map(|dt| dt.timestamp())
    } else {
        None
    }
}

/// Parse duration string to seconds. Supports:
/// - "H:MM:SS" or "HH:MM:SS" (hours:minutes:seconds)
/// - "M:SS" or "MM:SS" (minutes:seconds)
/// - "SS" or "SS.ms" (seconds with optional decimals)
/// - Numeric values (already in seconds)
fn try_parse_duration(value: &Value) -> Option<f64> {
    match value {
        Value::Number(n) => n.as_f64(),
        Value::String(s) => {
            let s = s.trim();
            let parts: Vec<&str> = s.split(':').collect();
            match parts.len() {
                3 => {
                    // H:MM:SS
                    let hours: f64 = parts[0].parse().ok()?;
                    let mins: f64 = parts[1].parse().ok()?;
                    let secs: f64 = parts[2].parse().ok()?;
                    Some(hours * 3600.0 + mins * 60.0 + secs)
                }
                2 => {
                    // M:SS
                    let mins: f64 = parts[0].parse().ok()?;
                    let secs: f64 = parts[1].parse().ok()?;
                    Some(mins * 60.0 + secs)
                }
                1 => {
                    // Plain seconds (possibly with decimals)
                    s.parse().ok()
                }
                _ => None,
            }
        }
        _ => None,
    }
}

/// Classify a plain value (no type hint) - NO date guessing
fn classify_value(value: &Value) -> (Option<String>, Option<f64>, Option<i64>) {
    match value {
        Value::String(s) => (Some(s.clone()), None, None), // Just store as text
        Value::Number(n) => {
            if let Some(i) = n.as_i64() {
                (None, Some(i as f64), None)
            } else if let Some(f) = n.as_f64() {
                (None, Some(f), None)
            } else {
                (Some(n.to_string()), None, None)
            }
        }
        Value::Bool(b) => (None, Some(if *b { 1.0 } else { 0.0 }), None),
        Value::Null => (Some(String::new()), None, None),
        // Store arrays/objects as their JSON string representation
        Value::Array(_) | Value::Object(_) => (Some(value.to_string()), None, None),
    }
}

/// Classify a value with optional type hint
fn classify_typed_value(
    typed: &TypedValue,
) -> Result<(Option<String>, Option<f64>, Option<i64>), String> {
    match typed {
        TypedValue::Plain(v) => Ok(classify_value(v)),
        TypedValue::Hinted { value, type_hint } => match type_hint.as_str() {
            "duration" => {
                if let Some(secs) = try_parse_duration(value) {
                    Ok((None, Some(secs), None))
                } else {
                    Err(format!("cannot parse as duration: {value}"))
                }
            }
            "datetime" => {
                if let Value::String(s) = value {
                    if let Some(ts) = try_parse_datetime(s) {
                        Ok((None, None, Some(ts)))
                    } else {
                        Err(format!("cannot parse as datetime: {value}"))
                    }
                } else if let Value::Number(n) = value {
                    // Handle numeric year (e.g., 2005 from audio metadata)
                    if let Some(year) = n.as_i64() {
                        if let Some(ts) = year_to_timestamp(year as i32) {
                            Ok((None, None, Some(ts)))
                        } else {
                            Err(format!("year out of range: {year}"))
                        }
                    } else {
                        Err(format!("datetime requires integer year, got: {value}"))
                    }
                } else {
                    Err(format!("datetime requires string or year, got: {value}"))
                }
            }
            unknown => Err(format!("unknown type hint: {unknown}")),
        },
    }
}

/// Determine storage type for plain value - NO date guessing
fn get_value_type(value: &Value) -> FactValueType {
    match value {
        Value::String(_) => FactValueType::Text, // Always text
        Value::Number(_) | Value::Bool(_) => FactValueType::Num,
        Value::Null | Value::Array(_) | Value::Object(_) => FactValueType::Text,
    }
}

/// Determine storage type with type hint
fn get_typed_value_type(typed: &TypedValue) -> Option<FactValueType> {
    match typed {
        TypedValue::Plain(v) => Some(get_value_type(v)),
        TypedValue::Hinted { value, type_hint } => match type_hint.as_str() {
            "duration" => {
                if try_parse_duration(value).is_some() {
                    Some(FactValueType::Num)
                } else {
                    None
                }
            }
            "datetime" => {
                if let Value::String(s) = value {
                    if try_parse_datetime(s).is_some() {
                        Some(FactValueType::Time)
                    } else {
                        None
                    }
                } else if let Value::Number(n) = value {
                    // Numeric year (e.g., 2005)
                    if let Some(year) = n.as_i64() {
                        if year_to_timestamp(year as i32).is_some() {
                            Some(FactValueType::Time)
                        } else {
                            None
                        }
                    } else {
                        None
                    }
                } else {
                    None
                }
            }
            _ => None, // Unknown hint
        },
    }
}

/// Promote content facts from a source to its newly-linked object.
///
/// This function is called within a transaction that also links the source to the object,
/// ensuring atomicity. It:
/// 1. Fetches all facts on the source
/// 2. For each content fact (content.* namespace):
///    - If the object doesn't already have this fact, copies it to the object
///    - Deletes the source fact
fn promote_content_facts(conn: &Connection, source_id: i64, object_id: i64) -> Result<u64> {
    // Fetch all facts on this source
    let facts = repo::fact::fetch_source_facts(conn, source_id)?;

    let mut promoted = 0u64;
    for fact in facts {
        if is_content_fact(&fact.key) {
            // Check if object already has this fact
            if !repo::fact::object_has_fact(conn, object_id, &fact.key)? {
                // Copy to object
                repo::fact::insert_object_fact(
                    conn,
                    object_id,
                    &fact.key,
                    fact.value_text.as_deref(),
                    fact.value_num,
                    fact.value_time,
                    fact.observed_at,
                )?;
                promoted += 1;
            }

            // Delete from source
            repo::fact::delete_by_id(conn, fact.id)?;
        }
    }

    Ok(promoted)
}