mx 0.1.199

A Swiss army knife for Claude Code and multi-agent toolkits
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
1001
1002
1003
1004
1005
1006
1007
1008
1009
1010
1011
1012
1013
1014
1015
1016
1017
1018
1019
1020
1021
1022
1023
1024
1025
1026
1027
1028
1029
1030
1031
1032
1033
1034
1035
1036
1037
1038
1039
1040
1041
1042
1043
1044
1045
1046
1047
1048
1049
1050
1051
1052
1053
1054
1055
1056
1057
1058
1059
1060
1061
1062
1063
1064
1065
1066
1067
1068
1069
1070
1071
1072
1073
1074
1075
1076
1077
1078
1079
1080
1081
1082
1083
1084
1085
1086
1087
1088
1089
1090
1091
1092
1093
1094
1095
1096
1097
1098
1099
1100
1101
1102
1103
1104
1105
1106
1107
1108
1109
1110
1111
1112
1113
1114
1115
1116
1117
1118
1119
1120
1121
1122
1123
1124
1125
1126
1127
1128
1129
1130
1131
1132
1133
1134
1135
1136
1137
1138
1139
1140
1141
1142
1143
1144
1145
1146
1147
1148
1149
1150
1151
1152
1153
1154
1155
1156
1157
1158
1159
1160
1161
1162
1163
1164
1165
1166
1167
1168
1169
1170
1171
1172
1173
1174
1175
1176
1177
1178
1179
1180
1181
1182
1183
1184
1185
1186
1187
1188
1189
1190
1191
1192
1193
1194
1195
1196
1197
1198
1199
1200
1201
1202
1203
1204
1205
1206
1207
1208
1209
1210
1211
1212
1213
1214
1215
1216
1217
1218
1219
1220
1221
1222
1223
1224
1225
1226
1227
1228
1229
1230
1231
1232
1233
1234
1235
1236
1237
1238
1239
1240
1241
1242
1243
1244
1245
1246
1247
1248
1249
1250
1251
1252
1253
1254
1255
1256
1257
1258
1259
1260
1261
1262
1263
1264
1265
1266
1267
1268
1269
1270
1271
1272
1273
1274
1275
1276
1277
1278
1279
1280
1281
1282
1283
1284
1285
1286
1287
1288
1289
1290
1291
1292
1293
1294
1295
1296
1297
1298
1299
1300
1301
1302
1303
1304
1305
1306
1307
1308
1309
1310
1311
1312
1313
1314
1315
1316
1317
1318
1319
1320
1321
1322
1323
1324
1325
1326
1327
1328
1329
1330
1331
1332
1333
1334
1335
1336
1337
1338
1339
1340
1341
1342
1343
1344
1345
1346
1347
1348
1349
1350
1351
1352
1353
1354
1355
use anyhow::{Result, bail};

use crate::cli::EntryFilter;
use crate::index::IndexConfig;
use crate::knowledge;
use crate::store;
use crate::surreal_db::SurrealDatabase;

/// Apply in-memory field presence filters to a list of entries
pub(crate) fn apply_entry_filters(
    entries: Vec<knowledge::KnowledgeEntry>,
    filter: &EntryFilter,
) -> Vec<knowledge::KnowledgeEntry> {
    let mut entries: Vec<_> = entries
        .into_iter()
        .filter(|e| !filter.has_wake_phrase || e.has_any_wake_phrase())
        .filter(|e| !filter.missing_wake_phrase || !e.has_any_wake_phrase())
        .filter(|e| !filter.has_anchors || !e.anchors.is_empty())
        .filter(|e| !filter.missing_anchors || e.anchors.is_empty())
        .filter(|e| {
            !filter.has_resonance_type || e.resonance_type.as_ref().is_some_and(|s| !s.is_empty())
        })
        .filter(|e| {
            !filter.missing_resonance_type || e.resonance_type.as_ref().is_none_or(|s| s.is_empty())
        })
        .filter(|e| {
            filter
                .tags
                .as_ref()
                .is_none_or(|filter_tags| filter_tags.iter().any(|t| e.tags.contains(t)))
        })
        .collect();

    // Apply limit if specified
    if let Some(n) = filter.limit {
        entries.truncate(n);
    }

    entries
}

/// Normalize a knowledge entry ID (accept both "kn-abc" and "abc", normalize to "kn-abc")
pub(crate) fn normalize_id(id: &str) -> String {
    if id.starts_with("kn-") {
        id.to_string()
    } else {
        format!("kn-{}", id)
    }
}

/// Routing table for fact types to categories and tags
pub(crate) struct FactRouting {
    pub(crate) category: &'static str,
    pub(crate) tags: Vec<&'static str>,
}

/// Find an open thread by content match
///
/// Uses normalized content comparison to handle whitespace/formatting differences.
/// Threads without summary metadata are treated as potentially open: the close
/// handler always writes state, so absence implies never-closed (pre-convention threads).
pub(crate) fn find_open_thread_by_content(
    db: &dyn store::KnowledgeStore,
    content: &str,
    agent_id: &str,
) -> Result<String> {
    use crate::knowledge::KnowledgeEntry;

    let ctx = store::AgentContext::for_agent(agent_id);
    let filter = store::KnowledgeFilter {
        categories: Some(vec!["thread".to_string()]),
        ..Default::default()
    };

    let threads = db.list_by_category("thread", &ctx, &filter)?;
    let normalized_content = KnowledgeEntry::normalize_content(content);

    for thread in threads {
        // Check if normalized body matches and state is open (or absent — pre-convention threads)
        let is_open = match thread.get_summary_state().as_deref() {
            None => true, // Pre-convention threads lack summary metadata. Since the close
            // handler always writes state, absence implies never-closed.
            Some("open") => true,
            _ => false,
        };

        if is_open && let Some(body) = &thread.body {
            let normalized_body = KnowledgeEntry::normalize_content(body);
            if normalized_body == normalized_content {
                return Ok(thread.id);
            }
        }
    }

    bail!("No open thread found matching content: '{}'", content)
}

/// Route a fact type to its target category and tags.
/// NOTE: The category targets below (decision, insight, reference, thread) map to the default
/// seed categories in schema/surrealdb-schema.surql. Custom deployments that rename or remove
/// these seed categories must update this routing table accordingly.
pub(crate) fn route_fact_type(fact_type: &str) -> Result<FactRouting> {
    const VALID_FACT_TYPES: &[&str] = &[
        "decision",
        "insight",
        "person",
        "quote",
        "thread_opened",
        "commitment",
        "thread_closed",
    ];

    match fact_type {
        "decision" => Ok(FactRouting {
            category: "decision",
            tags: vec![],
        }),
        "insight" => Ok(FactRouting {
            category: "insight",
            tags: vec![],
        }),
        "person" => Ok(FactRouting {
            category: "reference",
            tags: vec!["person"],
        }),
        "quote" => Ok(FactRouting {
            category: "reference",
            tags: vec!["quote"],
        }),
        "thread_opened" => Ok(FactRouting {
            category: "thread",
            tags: vec!["question"],
        }),
        "commitment" => Ok(FactRouting {
            category: "thread",
            tags: vec!["commitment"],
        }),
        "thread_closed" => Ok(FactRouting {
            category: "thread",
            tags: vec![],
        }),
        unknown => {
            bail!(
                "Invalid fact type '{}'. Valid types: {}",
                unknown,
                VALID_FACT_TYPES.join(", ")
            )
        }
    }
}

/// Resolve agent context from environment and flags
pub(crate) fn resolve_agent_context(mine: bool, include_private: bool) -> store::AgentContext {
    match std::env::var("MX_CURRENT_AGENT") {
        Ok(agent) if !agent.is_empty() => {
            if mine {
                // --mine: only show private entries owned by this agent
                store::AgentContext::for_agent(agent)
            } else if include_private {
                // --include-private: show public + private entries owned by this agent
                store::AgentContext::for_agent(agent)
            } else {
                // default: only show public entries
                store::AgentContext::public_for_agent(agent)
            }
        }
        _ => store::AgentContext::public_only(),
    }
}

/// Similarity threshold above which two entries are considered near-duplicates
/// and should NOT be anchored together. Used in both the batch `AutoAnchor`
/// handler and the per-entry `auto_anchor` helper.
pub(crate) const NEAR_DUPLICATE_CEILING: f32 = 0.95;

/// Default minimum similarity for two entries to be considered anchor-worthy.
pub(crate) const DEFAULT_ANCHOR_THRESHOLD: f32 = 0.75;

/// Over-fetch factor for `auto_anchor`'s bounded candidate query (Issue #362):
/// we fetch `MAX_ANCHORS * ANCHOR_CANDIDATE_OVERFETCH` rows by score, leaving
/// headroom for the handful of high-scoring rows that get filtered out (self,
/// near-duplicates above the ceiling, existing/removed anchors) before we run
/// out of in-band candidates.
pub(crate) const ANCHOR_CANDIDATE_OVERFETCH: usize = 5;

/// Escalation cap for `auto_anchor`'s candidate query. When the normal
/// over-fetch is *saturated* in a way that could truncate genuine in-band
/// anchors (see the saturation signal at the call site, Issue #362 / PR #366),
/// we re-query at this much larger bound so the selected anchors provably match
/// the old exhaustive full-scan behavior. A single re-query at this cap is still
/// far cheaper than the old per-write full hydrate + Rust cosine loop, and it
/// only fires in the degenerate near-duplicate-flood case.
pub(crate) const MAX_ANCHOR_CANDIDATES: usize = 500;

/// Calculate cosine similarity between two vectors
///
/// Returns a value between -1.0 and 1.0 (typically 0.0 to 1.0 for normalized embeddings)
pub(crate) fn cosine_similarity(a: &[f32], b: &[f32]) -> f32 {
    if a.len() != b.len() {
        return 0.0;
    }

    let dot_product: f32 = a.iter().zip(b.iter()).map(|(x, y)| x * y).sum();
    let magnitude_a: f32 = a.iter().map(|x| x * x).sum::<f32>().sqrt();
    let magnitude_b: f32 = b.iter().map(|x| x * x).sum::<f32>().sqrt();

    if magnitude_a == 0.0 || magnitude_b == 0.0 {
        return 0.0;
    }

    dot_product / (magnitude_a * magnitude_b)
}

/// Auto-embed a knowledge entry after add/update.
///
/// For short entries (<=400 tokens): stores a single embedding on the entry.
/// For long entries (>400 tokens): splits into overlapping chunks, embeds each
/// chunk separately, stores chunks in `embedding_chunk` table, and stores a
/// mean vector on the entry for auto_anchor compatibility.
pub(crate) fn auto_embed(entry_id: &str, db: &dyn store::KnowledgeStore) -> Result<()> {
    use crate::chunking::{ChunkConfig, chunk_text};
    use crate::embeddings::{EmbeddingProvider, TractProvider};

    let ctx = match std::env::var("MX_CURRENT_AGENT") {
        Ok(agent) if !agent.is_empty() => store::AgentContext::for_agent(agent),
        _ => store::AgentContext::public_only(),
    };

    let mut entry = match db.get(entry_id, &ctx)? {
        Some(e) => e,
        None => return Ok(()),
    };

    let provider = TractProvider::new()?;
    let embedding_text = entry.embedding_text();
    let config = ChunkConfig::default();
    // Use load_tokenizer() (no truncation) for chunking — the provider's
    // tokenizer truncates at 512 which would hide content beyond that point.
    // Chunking must see ALL tokens to split them correctly.
    let chunking_tokenizer = crate::embeddings::load_tokenizer()?;
    let chunks = chunk_text(&embedding_text, &chunking_tokenizer, &config);

    if chunks.len() == 1 {
        // Short entry: single embedding, no chunks
        let embedding = provider.embed(&chunks[0].text)?;
        entry.embedding = Some(embedding);
        entry.embedding_model = Some(provider.model_id().to_string());
        entry.embedded_at = Some(chrono::Utc::now().to_rfc3339());
        entry.chunk_count = 0;
        entry.updated_at = Some(chrono::Utc::now().to_rfc3339());
        db.upsert_knowledge(&entry)?;
        db.delete_embedding_chunks(entry_id)?; // clean up any stale chunks
    } else {
        // Long entry: chunk, embed each, store chunks + mean vector on entry
        let mut chunk_embeddings = Vec::with_capacity(chunks.len());
        for chunk in &chunks {
            chunk_embeddings.push(provider.embed(&chunk.text)?);
        }

        // Store chunks (delete-then-insert)
        db.delete_embedding_chunks(entry_id)?;
        for (chunk, embedding) in chunks.iter().zip(chunk_embeddings.iter()) {
            db.insert_embedding_chunk(
                entry_id,
                chunk.chunk_index,
                &chunk.text,
                chunk.token_offset,
                chunk.token_count,
                embedding,
                provider.model_id(),
            )?;
        }

        // Mean vector on entry (for auto_anchor compatibility)
        let dims = provider.dimensions();
        let mut mean_vec = vec![0.0f32; dims];
        for emb in &chunk_embeddings {
            for (i, v) in emb.iter().enumerate() {
                mean_vec[i] += v;
            }
        }
        let n = chunk_embeddings.len() as f32;
        for v in mean_vec.iter_mut() {
            *v /= n;
        }
        // L2 normalize
        let l2: f32 = mean_vec.iter().map(|x| x * x).sum::<f32>().sqrt();
        if l2 > 0.0 {
            for v in mean_vec.iter_mut() {
                *v /= l2;
            }
        }

        entry.embedding = Some(mean_vec);
        entry.embedding_model = Some(provider.model_id().to_string());
        entry.embedded_at = Some(chrono::Utc::now().to_rfc3339());
        entry.chunk_count = chunks.len() as i32;
        entry.updated_at = Some(chrono::Utc::now().to_rfc3339());
        db.upsert_knowledge(&entry)?;
    }

    Ok(())
}

/// Whether the write path should run `auto_anchor` synchronously after a
/// mutation (Add/Update/Edit/Append/Prepend/Restore).
///
/// Anchoring on the write path is disabled when EITHER:
///   - the caller passed `--no-auto-anchor` (`no_auto_anchor == true`), or
///   - `MX_SKIP_WRITE_ANCHOR` is set to `1`/`true` (case-insensitive).
///
/// The env-var parsing mirrors the `MX_SKIP_SCHEMA` convention
/// (`connection.rs`) so the project keeps one rule for boolean opt-out flags.
///
/// `MX_SKIP_WRITE_ANCHOR` is a future-facing opt-out: it lets a deployment
/// defer anchoring entirely to the explicit `mx memory auto-anchor` batch
/// command (e.g. a nightly cron), which is never gated by this flag.
///
/// Skipping anchoring does NOT affect durability. By the time this gate is
/// evaluated the entry has already been `upsert_knowledge`d, read-back
/// verified (the Add path `bail!`s if the row is absent), and re-upserted by
/// `auto_embed` — so the write is provably durable before anchoring would
/// ever run. `auto_anchor` itself returns early WITHOUT any upsert whenever
/// an entry has no embedding or no in-band neighbours; its trailing upsert is
/// an anchor update, not a load-bearing commit. Hence skipping it loses
/// anchors-for-this-write, nothing else.
pub(crate) fn write_anchor_enabled(no_auto_anchor: bool) -> bool {
    let skip_via_env =
        std::env::var("MX_SKIP_WRITE_ANCHOR").is_ok_and(|v| v == "1" || v.to_lowercase() == "true");
    !no_auto_anchor && !skip_via_env
}

/// Auto-anchor a knowledge entry after add/update
///
/// This silently finds similar entries and adds anchors for a single entry.
/// Uses defaults: threshold 0.75, max 5 anchors.
pub(crate) fn auto_anchor(
    entry_id: &str,
    db: &dyn store::KnowledgeStore,
    explicitly_removed: Option<&[String]>,
) -> Result<()> {
    // Get agent context for fetching entries
    let ctx = match std::env::var("MX_CURRENT_AGENT") {
        Ok(agent) if !agent.is_empty() => store::AgentContext::for_agent(agent),
        _ => store::AgentContext::public_only(),
    };

    // Fetch the entry
    let entry = match db.get(entry_id, &ctx)? {
        Some(e) => e,
        None => return Ok(()), // Entry not found, skip silently
    };

    // Skip if no embedding
    if entry.embedding.is_none() {
        return Ok(());
    }

    let entry_embedding = entry.embedding.as_ref().unwrap();

    let threshold = DEFAULT_ANCHOR_THRESHOLD;
    let max_anchors = 5;

    // ---- Candidate fetch (Issue #362) -----------------------------------
    //
    // Previously this hydrated the ENTIRE graph (`db.list_all`) and ran a Rust
    // cosine loop over every embedded entry on every write — O(n) in the
    // application layer, the dominant cost of a save (~15.6s of ~16s).
    //
    // Instead we ask the DB for the top-K most similar entries by cosine score,
    // scored against this entry's own embedding (entry-level mean vector only —
    // see `semantic_search_entries_scored`, which deliberately ignores
    // chunk-level matching so anchoring keeps strict entry-level-mean semantics
    // and selects the SAME anchors the old full scan would have). The band
    // filter, privacy filter, self-exclusion and max_anchors cap below are
    // applied unchanged — only the candidate SOURCE moved from full-scan to
    // bounded DB query.
    //
    // Over-fetch factor: the band is [0.75, 0.95]. The top-K-by-score query
    // returns the highest scores first, so the slots ahead of an in-band
    // candidate can be consumed by: self (~1.0), near-duplicates (>0.95),
    // existing anchors (re-handled separately below), and explicitly-removed
    // anchors. Over-fetching 5x max_anchors (25) leaves ample headroom for that
    // handful of high-scoring rejects before we run out of in-band candidates.
    let candidate_fetch_k = max_anchors * ANCHOR_CANDIDATE_OVERFETCH;
    let mut scored_candidates =
        db.semantic_search_entries_scored(entry_embedding, &ctx, candidate_fetch_k)?;

    // ---- Saturation detection + escalation (PR #366, hardening #362) -----
    //
    // The bounded top-K fetch diverges from the old exhaustive scan in EXACTLY
    // one degenerate case: if MORE than (K - max_anchors) rows score above an
    // in-band member (e.g. >20 near-identical copies above the 0.95 ceiling),
    // a legitimate in-band anchor could rank below K and be silently dropped —
    // a behavior change vs. the old full scan.
    //
    // We detect this with an EXACT signal. We are only at risk of having
    // truncated in-band rows when BOTH hold:
    //   1. the result is K-saturated (`len == candidate_fetch_k`), i.e. the DB
    //      had at least K candidates and the query hit its bound; AND
    //   2. the lowest-scoring returned candidate still scores at/above the band
    //      floor (>= threshold) — so scores had NOT yet dropped below 0.75 when
    //      the bound cut us off, meaning additional in-band rows may exist past K.
    //
    // This is exact: if the lowest returned score is already below the floor,
    // every row beyond K scores even lower (results are score-descending) and is
    // therefore out-of-band — nothing the old scan would have kept was missed.
    // Likewise, if the result is not saturated, the DB returned every candidate
    // it had, identical to the full scan's candidate universe.
    //
    // On the saturated signal we ESCALATE: re-query at MAX_ANCHOR_CANDIDATES and
    // proceed with that fuller set. This stays entirely on the bounded DB path
    // (no per-write full hydrate) and only triggers in the degenerate flood.
    let saturated = scored_candidates.len() == candidate_fetch_k
        && scored_candidates
            .last()
            .is_some_and(|(_, score)| *score >= threshold);
    if saturated {
        scored_candidates =
            db.semantic_search_entries_scored(entry_embedding, &ctx, MAX_ANCHOR_CANDIDATES)?;
    }

    let mut similarities: Vec<(String, f32)> = Vec::new();

    for (candidate, similarity) in &scored_candidates {
        // Skip self
        if candidate.id == entry.id {
            continue;
        }

        // Existing anchors are NOT considered as new candidates here. Their
        // staleness is re-evaluated separately below (by-ID recompute) so we
        // never depend on them appearing in the bounded top-K query.
        if entry.anchors.contains(&candidate.id) {
            continue;
        }

        // Skip anchors that the user explicitly removed via --anchors replacement.
        // auto_anchor is a safety net for missed connections, not an override of
        // explicit user intent.
        //
        // Defensive: current callers (Add, Update) already strip explicitly-removed
        // anchors before reaching this loop, but future call sites might not. This
        // guard ensures auto_anchor never re-adds an anchor the user chose to remove,
        // regardless of how the caller is wired.
        if let Some(removed) = explicitly_removed
            && removed.contains(&candidate.id)
        {
            continue;
        }

        // Privacy check
        let can_anchor = if entry.visibility == "private" {
            // Private can anchor to same-owner private OR public
            candidate.visibility == "public"
                || (candidate.visibility == "private" && candidate.owner == entry.owner)
        } else {
            // Public can only anchor to public
            candidate.visibility == "public"
        };

        if !can_anchor {
            continue;
        }

        // Filter by threshold, skip near-duplicates. The score is the same cosine
        // value the old Rust loop computed; we use the DB-computed score directly.
        if *similarity >= threshold && *similarity <= NEAR_DUPLICATE_CEILING {
            similarities.push((candidate.id.clone(), *similarity));
        }
    }

    // ---- #199 stale-anchor pruning --------------------------------------
    //
    // Re-evaluate EXISTING anchors and prune any that have dropped out of the
    // band on the current embeddings. Existing anchors won't necessarily appear
    // in the bounded top-K similarity query (they may now score below K, which
    // is precisely why they're stale), so we fetch each existing anchor's
    // embedding BY ID — a bounded handful — and recompute similarity directly
    // with `cosine_similarity`, exactly as the old full-scan path did. This
    // preserves #199 behavior with no dependency on the top-K candidate set.
    let mut stale_anchors: Vec<String> = Vec::new();
    for anchor_id in &entry.anchors {
        // Skip a degenerate self-anchor: never treat the entry's own id as stale
        // here (matches the old loop, which `continue`d on self before staleness).
        if *anchor_id == entry.id {
            continue;
        }
        let anchor_entry = match db.get(anchor_id, &ctx)? {
            Some(e) => e,
            // Anchor target no longer visible/exists: leave the existing
            // behavior untouched (old code only saw it if list_all returned it;
            // if it didn't, it wasn't pruned). Don't prune on absence.
            None => continue,
        };
        let Some(anchor_embedding) = anchor_entry.embedding.as_ref() else {
            // No embedding to compare against — old loop filtered these out of
            // `candidates`, so it never marked them stale. Preserve that.
            continue;
        };
        let similarity = cosine_similarity(entry_embedding, anchor_embedding);
        if similarity < threshold || similarity > NEAR_DUPLICATE_CEILING {
            stale_anchors.push(anchor_id.clone());
        }
    }

    // No similar entries found and no stale anchors to prune
    if stale_anchors.is_empty() && similarities.is_empty() {
        return Ok(());
    }

    // Sort by similarity (descending) and take top N
    similarities.sort_by(|a, b| b.1.partial_cmp(&a.1).unwrap_or(std::cmp::Ordering::Equal));
    let top_matches: Vec<String> = similarities
        .into_iter()
        .take(max_anchors)
        .map(|(id, _)| id)
        .collect();

    // Update the entry with new anchors, filtering out stale ones
    let mut updated_anchors: Vec<String> = entry
        .anchors
        .clone()
        .into_iter()
        .filter(|a| !stale_anchors.contains(a))
        .collect();

    if let Some(removed) = explicitly_removed {
        updated_anchors.retain(|a| !removed.contains(a));
    }

    updated_anchors.extend(top_matches);
    updated_anchors.sort();
    updated_anchors.dedup();

    // Create updated entry
    let mut updated_entry = entry.clone();
    updated_entry.anchors = updated_anchors;
    updated_entry.updated_at = Some(chrono::Utc::now().to_rfc3339());

    // Save to database
    db.upsert_knowledge(&updated_entry)?;

    Ok(())
}

/// Open the SurrealDB graph database for the given config.
pub(crate) fn open_surreal(config: &IndexConfig, verbose: bool) -> Result<SurrealDatabase> {
    let surreal_path = config.db_path.with_extension("surreal");
    SurrealDatabase::open_with_verbose(surreal_path, verbose)
}

#[cfg(test)]
mod auto_anchor_tests {
    //! Tests for `auto_anchor` after the Issue #362 rewrite (DB-side bounded
    //! candidate fetch replacing `list_all` + full-graph Rust cosine loop).
    //!
    //! The headline guarantee these tests defend: anchoring CORRECTNESS is
    //! unchanged — the band filter, privacy filter, self-exclusion, max_anchors
    //! cap and #199 stale-anchor pruning all behave exactly as the old full
    //! scan did; only the candidate SOURCE moved to the bounded DB query.
    //!
    //! Env-var sensitive (`MX_CURRENT_AGENT` drives the agent context), so these
    //! are `#[serial]` and reset the var deterministically.

    use super::*;
    use crate::knowledge::KnowledgeEntry;
    use crate::store::{AgentContext, KnowledgeStore};
    use serial_test::serial;

    /// A unit vector whose cosine similarity with the reference query vector
    /// `unit_query()` is exactly `cos` (within f32 precision). Built as
    /// `[cos, sin, 0, 0]`, which is unit-length, so cosine == dot product == cos.
    fn unit_vec(cos: f32) -> Vec<f32> {
        let sin = (1.0 - cos * cos).max(0.0).sqrt();
        vec![cos, sin, 0.0, 0.0]
    }

    /// The reference/query direction: `[1, 0, 0, 0]`.
    fn unit_query() -> Vec<f32> {
        vec![1.0, 0.0, 0.0, 0.0]
    }

    fn entry_with_embedding(
        id: &str,
        embedding: Vec<f32>,
        visibility: &str,
        owner: Option<&str>,
        anchors: Vec<String>,
    ) -> KnowledgeEntry {
        let now = chrono::Utc::now().to_rfc3339();
        KnowledgeEntry {
            id: id.to_string(),
            category_id: "test".to_string(),
            title: format!("Entry {id}"),
            body: Some("body".to_string()),
            summary: None,
            applicability: vec![],
            source_project_id: None,
            source_agent_id: None,
            file_path: None,
            tags: vec![],
            created_at: Some(now.clone()),
            updated_at: Some(now.clone()),
            content_hash: Some(format!("hash-{id}")),
            source_type_id: Some("manual".to_string()),
            entry_type_id: Some("primary".to_string()),
            session_id: None,
            ephemeral: false,
            content_type_id: Some("text".to_string()),
            owner: owner.map(|o| o.to_string()),
            visibility: visibility.to_string(),
            resonance: 5,
            resonance_type: Some("ephemeral".to_string()),
            last_activated: Some(now),
            activation_count: 0,
            decay_rate: 0.0,
            anchors,
            wake_phrases: vec![],
            triggers: vec![],
            wake_order: None,
            wake_phrase: None,
            embedding: Some(embedding),
            embedding_model: Some("test-model".to_string()),
            embedded_at: Some(chrono::Utc::now().to_rfc3339()),
            chunk_count: 0,
            format: "markdown".to_string(),
            effective_resonance: None,
        }
    }

    /// Clear `MX_CURRENT_AGENT` so `auto_anchor` uses `public_only` context.
    /// SAFETY: process-wide env mutation, serialized via `#[serial]`.
    fn clear_agent_env() {
        unsafe {
            std::env::remove_var("MX_CURRENT_AGENT");
        }
    }

    /// Reference implementation of the OLD candidate selection: full scan over
    /// every embedded entry, the band + privacy + self filters, sort-by-score,
    /// take max_anchors. Returns the set of NEW anchor ids the old code would
    /// have added (NOT including stale pruning — tested separately). Used to
    /// prove the rewrite picks the same anchors.
    fn reference_old_anchors(
        target: &KnowledgeEntry,
        all: &[KnowledgeEntry],
        max_anchors: usize,
    ) -> Vec<String> {
        let threshold = DEFAULT_ANCHOR_THRESHOLD;
        let target_emb = target.embedding.as_ref().unwrap();
        let mut sims: Vec<(String, f32)> = Vec::new();
        for cand in all {
            if cand.id == target.id {
                continue;
            }
            if target.anchors.contains(&cand.id) {
                continue;
            }
            let Some(cand_emb) = cand.embedding.as_ref() else {
                continue;
            };
            let can_anchor = if target.visibility == "private" {
                cand.visibility == "public"
                    || (cand.visibility == "private" && cand.owner == target.owner)
            } else {
                cand.visibility == "public"
            };
            if !can_anchor {
                continue;
            }
            let sim = cosine_similarity(target_emb, cand_emb);
            if sim >= threshold && sim <= NEAR_DUPLICATE_CEILING {
                sims.push((cand.id.clone(), sim));
            }
        }
        sims.sort_by(|a, b| b.1.partial_cmp(&a.1).unwrap_or(std::cmp::Ordering::Equal));
        let mut ids: Vec<String> = sims
            .into_iter()
            .take(max_anchors)
            .map(|(id, _)| id)
            .collect();
        ids.sort();
        ids
    }

    fn anchors_of(db: &dyn KnowledgeStore, id: &str) -> Vec<String> {
        let ctx = AgentContext::public_only();
        let mut a = db.get(id, &ctx).unwrap().unwrap().anchors;
        a.sort();
        a
    }

    #[test]
    #[serial]
    fn auto_anchor_picks_same_anchors_as_old_full_scan() {
        clear_agent_env();
        let db = SurrealDatabase::open_in_memory().unwrap();

        // Seed a small graph spanning the whole similarity range relative to the
        // target's [1,0,0,0] direction.
        let target = entry_with_embedding("kn-target", unit_query(), "public", None, vec![]);
        let graph = vec![
            target.clone(),
            entry_with_embedding("kn-a", unit_vec(0.90), "public", None, vec![]), // in band
            entry_with_embedding("kn-b", unit_vec(0.85), "public", None, vec![]), // in band
            entry_with_embedding("kn-c", unit_vec(0.80), "public", None, vec![]), // in band
            entry_with_embedding("kn-d", unit_vec(0.78), "public", None, vec![]), // in band
            entry_with_embedding("kn-e", unit_vec(0.76), "public", None, vec![]), // in band (6th -> capped out)
            entry_with_embedding("kn-dup", unit_vec(0.97), "public", None, vec![]), // > ceiling
            entry_with_embedding("kn-far", unit_vec(0.60), "public", None, vec![]), // < threshold
        ];
        for e in &graph {
            db.upsert_knowledge(e).unwrap();
        }

        auto_anchor("kn-target", &db, None).unwrap();

        let got = anchors_of(&db, "kn-target");
        let expected = reference_old_anchors(&target, &graph, 5);

        assert_eq!(
            got, expected,
            "rewrite must select the same anchors as the old full scan"
        );
        // Concretely: top 5 in-band by score, dup/far excluded, self excluded.
        assert_eq!(
            got,
            vec![
                "kn-a".to_string(),
                "kn-b".to_string(),
                "kn-c".to_string(),
                "kn-d".to_string(),
                "kn-e".to_string()
            ]
        );
    }

    #[test]
    #[serial]
    fn band_filter_excludes_near_duplicates_and_below_threshold() {
        clear_agent_env();
        let db = SurrealDatabase::open_in_memory().unwrap();

        let target = entry_with_embedding("kn-t", unit_query(), "public", None, vec![]);
        db.upsert_knowledge(&target).unwrap();
        db.upsert_knowledge(&entry_with_embedding(
            "kn-dup",
            unit_vec(0.99),
            "public",
            None,
            vec![],
        ))
        .unwrap();
        db.upsert_knowledge(&entry_with_embedding(
            "kn-low",
            unit_vec(0.50),
            "public",
            None,
            vec![],
        ))
        .unwrap();
        db.upsert_knowledge(&entry_with_embedding(
            "kn-mid",
            unit_vec(0.85),
            "public",
            None,
            vec![],
        ))
        .unwrap();

        auto_anchor("kn-t", &db, None).unwrap();
        let got = anchors_of(&db, "kn-t");

        assert_eq!(
            got,
            vec!["kn-mid".to_string()],
            "only the in-band entry is anchored; near-dup (>0.95) and below-threshold (<0.75) excluded"
        );
    }

    #[test]
    #[serial]
    fn max_anchors_cap_respected() {
        clear_agent_env();
        let db = SurrealDatabase::open_in_memory().unwrap();

        let target = entry_with_embedding("kn-t", unit_query(), "public", None, vec![]);
        db.upsert_knowledge(&target).unwrap();
        // Seven in-band candidates with distinct, descending scores.
        let scores = [0.94, 0.92, 0.90, 0.88, 0.86, 0.84, 0.82];
        for (i, s) in scores.iter().enumerate() {
            db.upsert_knowledge(&entry_with_embedding(
                &format!("kn-c{i}"),
                unit_vec(*s),
                "public",
                None,
                vec![],
            ))
            .unwrap();
        }

        auto_anchor("kn-t", &db, None).unwrap();
        let got = anchors_of(&db, "kn-t");

        assert_eq!(got.len(), 5, "cap at max_anchors = 5");
        // The 5 highest-scoring band members.
        assert_eq!(
            got,
            vec![
                "kn-c0".to_string(),
                "kn-c1".to_string(),
                "kn-c2".to_string(),
                "kn-c3".to_string(),
                "kn-c4".to_string()
            ]
        );
    }

    #[test]
    #[serial]
    fn stale_anchor_pruned_via_by_id_recompute() {
        // #199: an existing anchor that has drifted below threshold must be
        // pruned even though it won't show up in the top-K similarity query.
        clear_agent_env();
        let db = SurrealDatabase::open_in_memory().unwrap();

        // Existing anchor "kn-stale" now sits FAR from the target (cos 0.40),
        // so it is below the 0.75 floor and must be pruned. Crucially we seed
        // MANY closer entries so kn-stale ranks well below any top-K cutoff —
        // proving the by-ID recompute (not the top-K query) is what prunes it.
        let mut target = entry_with_embedding("kn-t", unit_query(), "public", None, vec![]);
        target.anchors = vec!["kn-stale".to_string(), "kn-keep".to_string()];
        db.upsert_knowledge(&target).unwrap();

        // kn-keep is still in band -> must survive re-eval.
        db.upsert_knowledge(&entry_with_embedding(
            "kn-keep",
            unit_vec(0.88),
            "public",
            None,
            vec![],
        ))
        .unwrap();
        // kn-stale drifted out of band -> must be pruned.
        db.upsert_knowledge(&entry_with_embedding(
            "kn-stale",
            unit_vec(0.40),
            "public",
            None,
            vec![],
        ))
        .unwrap();
        // A pile of fresh in-band neighbors that crowd the top-K.
        for i in 0..10 {
            db.upsert_knowledge(&entry_with_embedding(
                &format!("kn-n{i}"),
                unit_vec(0.80 + i as f32 * 0.001),
                "public",
                None,
                vec![],
            ))
            .unwrap();
        }

        auto_anchor("kn-t", &db, None).unwrap();
        let got = anchors_of(&db, "kn-t");

        assert!(
            !got.contains(&"kn-stale".to_string()),
            "stale anchor below threshold must be pruned (by-ID recompute)"
        );
        assert!(
            got.contains(&"kn-keep".to_string()),
            "in-band existing anchor must be preserved"
        );
    }

    #[test]
    #[serial]
    fn near_duplicate_existing_anchor_is_pruned() {
        // #199 also prunes existing anchors that drifted ABOVE the near-dup
        // ceiling (band is closed on both ends).
        clear_agent_env();
        let db = SurrealDatabase::open_in_memory().unwrap();

        let mut target = entry_with_embedding("kn-t", unit_query(), "public", None, vec![]);
        target.anchors = vec!["kn-toodup".to_string()];
        db.upsert_knowledge(&target).unwrap();
        db.upsert_knowledge(&entry_with_embedding(
            "kn-toodup",
            unit_vec(0.98),
            "public",
            None,
            vec![],
        ))
        .unwrap();

        auto_anchor("kn-t", &db, None).unwrap();
        let got = anchors_of(&db, "kn-t");
        assert!(
            !got.contains(&"kn-toodup".to_string()),
            "existing anchor above the near-duplicate ceiling must be pruned"
        );
    }

    #[test]
    #[serial]
    fn self_is_never_anchored() {
        clear_agent_env();
        let db = SurrealDatabase::open_in_memory().unwrap();
        let target = entry_with_embedding("kn-solo", unit_query(), "public", None, vec![]);
        db.upsert_knowledge(&target).unwrap();
        // A single in-band neighbor so anchoring runs.
        db.upsert_knowledge(&entry_with_embedding(
            "kn-near",
            unit_vec(0.85),
            "public",
            None,
            vec![],
        ))
        .unwrap();

        auto_anchor("kn-solo", &db, None).unwrap();
        let got = anchors_of(&db, "kn-solo");
        assert!(
            !got.contains(&"kn-solo".to_string()),
            "an entry must never anchor to itself (self-similarity ~1.0 excluded)"
        );
        assert_eq!(got, vec!["kn-near".to_string()]);
    }

    #[test]
    #[serial]
    fn public_entry_does_not_anchor_to_private() {
        // Privacy preserved: a public entry must never anchor to a private one.
        // Under public_only context the DB visibility filter drops the private
        // row entirely, so it's not even a candidate.
        clear_agent_env();
        let db = SurrealDatabase::open_in_memory().unwrap();

        let target = entry_with_embedding("kn-pub", unit_query(), "public", None, vec![]);
        db.upsert_knowledge(&target).unwrap();
        // Private candidate that WOULD be in-band by score.
        db.upsert_knowledge(&entry_with_embedding(
            "kn-priv",
            unit_vec(0.90),
            "private",
            Some("agent-x"),
            vec![],
        ))
        .unwrap();
        // A public in-band candidate so the run produces something.
        db.upsert_knowledge(&entry_with_embedding(
            "kn-pub2",
            unit_vec(0.85),
            "public",
            None,
            vec![],
        ))
        .unwrap();

        auto_anchor("kn-pub", &db, None).unwrap();
        let got = anchors_of(&db, "kn-pub");
        assert!(
            !got.contains(&"kn-priv".to_string()),
            "public entry must not anchor to a private entry"
        );
        assert_eq!(got, vec!["kn-pub2".to_string()]);
    }

    #[test]
    #[serial]
    fn explicitly_removed_anchor_not_readded() {
        clear_agent_env();
        let db = SurrealDatabase::open_in_memory().unwrap();
        let target = entry_with_embedding("kn-t", unit_query(), "public", None, vec![]);
        db.upsert_knowledge(&target).unwrap();
        db.upsert_knowledge(&entry_with_embedding(
            "kn-removed",
            unit_vec(0.90),
            "public",
            None,
            vec![],
        ))
        .unwrap();
        db.upsert_knowledge(&entry_with_embedding(
            "kn-other",
            unit_vec(0.85),
            "public",
            None,
            vec![],
        ))
        .unwrap();

        let removed = vec!["kn-removed".to_string()];
        auto_anchor("kn-t", &db, Some(&removed)).unwrap();
        let got = anchors_of(&db, "kn-t");
        assert!(
            !got.contains(&"kn-removed".to_string()),
            "auto_anchor must not re-add an explicitly removed anchor"
        );
        assert_eq!(got, vec!["kn-other".to_string()]);
    }

    #[test]
    #[serial]
    fn no_embedding_skips_anchoring() {
        clear_agent_env();
        let db = SurrealDatabase::open_in_memory().unwrap();
        // Build a target with NO embedding (simulates the opt-out / un-embedded
        // path: auto_anchor returns early and never fetches candidates).
        let mut target = entry_with_embedding("kn-noemb", unit_query(), "public", None, vec![]);
        target.embedding = None;
        db.upsert_knowledge(&target).unwrap();
        db.upsert_knowledge(&entry_with_embedding(
            "kn-x",
            unit_vec(0.90),
            "public",
            None,
            vec![],
        ))
        .unwrap();

        auto_anchor("kn-noemb", &db, None).unwrap();
        let got = anchors_of(&db, "kn-noemb");
        assert!(got.is_empty(), "no embedding -> no anchoring");
    }

    /// PR #366 hardening: the degenerate near-duplicate-flood case. When MORE
    /// than (K - max_anchors) entries score above the band ceiling, the initial
    /// bounded top-K (= max_anchors * ANCHOR_CANDIDATE_OVERFETCH = 25) is filled
    /// almost entirely by out-of-band near-duplicates, pushing genuine in-band
    /// anchors past slot K. Without escalation those in-band anchors would be
    /// silently dropped vs. the old exhaustive scan. With saturation detection +
    /// escalation to MAX_ANCHOR_CANDIDATES, auto_anchor must still select exactly
    /// the in-band anchors the full-scan reference impl would.
    #[test]
    #[serial]
    fn escalates_when_saturated_by_near_duplicate_flood() {
        clear_agent_env();
        let db = SurrealDatabase::open_in_memory().unwrap();

        // Sanity: the over-fetch K used by auto_anchor.
        let max_anchors = 5usize;
        let k = max_anchors * ANCHOR_CANDIDATE_OVERFETCH; // 25

        let target = entry_with_embedding("kn-target", unit_query(), "public", None, vec![]);
        let mut graph = vec![target.clone()];

        // Flood: K-1 (24) near-duplicates ABOVE the 0.95 ceiling. They are NOT
        // anchorable (band-excluded), but they crowd the score-descending top-K,
        // leaving only a single slot for an in-band member in the initial fetch.
        let flood = k - 1; // 24 > (K - max_anchors) = 20
        for i in 0..flood {
            // Distinct scores in (0.95, 1.0) so ordering is deterministic and
            // every one sits strictly above the ceiling.
            let cos = 0.999 - (i as f32) * 0.0005;
            graph.push(entry_with_embedding(
                &format!("kn-dup{i:02}"),
                unit_vec(cos),
                "public",
                None,
                vec![],
            ));
        }

        // Genuine in-band anchors, all scoring BELOW every duplicate (so they
        // rank past slot K and would be truncated without escalation), but well
        // inside the [0.75, 0.95] band and within MAX_ANCHOR_CANDIDATES.
        let in_band = [
            ("kn-real-a", 0.90f32),
            ("kn-real-b", 0.87),
            ("kn-real-c", 0.84),
            ("kn-real-d", 0.81),
            ("kn-real-e", 0.78),
        ];
        for (id, cos) in in_band {
            graph.push(entry_with_embedding(
                id,
                unit_vec(cos),
                "public",
                None,
                vec![],
            ));
        }

        for e in &graph {
            db.upsert_knowledge(e).unwrap();
        }

        // Confirm the precondition: the INITIAL bounded fetch is saturated AND
        // its lowest returned score is still at/above the floor — i.e. it would
        // have truncated in-band rows without escalation.
        let ctx = AgentContext::public_only();
        let initial = db
            .semantic_search_entries_scored(target.embedding.as_ref().unwrap(), &ctx, k)
            .unwrap();
        assert_eq!(initial.len(), k, "initial fetch must be K-saturated");
        assert!(
            initial.last().unwrap().1 >= DEFAULT_ANCHOR_THRESHOLD,
            "lowest returned score must still be >= floor (saturation signal fires)"
        );

        auto_anchor("kn-target", &db, None).unwrap();

        let got = anchors_of(&db, "kn-target");
        let expected = reference_old_anchors(&target, &graph, max_anchors);

        // Escalation must recover the full in-band set, matching the exhaustive
        // reference exactly.
        assert_eq!(
            got, expected,
            "escalation must select the same in-band anchors as the old full scan"
        );
        assert_eq!(
            got,
            vec![
                "kn-real-a".to_string(),
                "kn-real-b".to_string(),
                "kn-real-c".to_string(),
                "kn-real-d".to_string(),
                "kn-real-e".to_string(),
            ],
            "all five genuine in-band anchors recovered despite the near-duplicate flood"
        );
    }

    /// PR #366: the saturation signal is EXACT — escalation must NOT fire when
    /// the bounded fetch is full but its lowest returned score is already below
    /// the band floor. In that case every row beyond K is out-of-band, so the
    /// top-K already contains every anchor-worthy candidate; re-querying would be
    /// wasted work and a perf regression in a common shape (many low-similarity
    /// neighbors). We assert correctness is preserved without relying on the
    /// escalation path.
    #[test]
    #[serial]
    fn does_not_escalate_when_lowest_score_below_floor() {
        clear_agent_env();
        let db = SurrealDatabase::open_in_memory().unwrap();

        let max_anchors = 5usize;
        let k = max_anchors * ANCHOR_CANDIDATE_OVERFETCH; // 25

        let target = entry_with_embedding("kn-target", unit_query(), "public", None, vec![]);
        let mut graph = vec![target.clone()];

        // A few in-band anchors at the top...
        let in_band = [("kn-a", 0.90f32), ("kn-b", 0.85), ("kn-c", 0.80)];
        for (id, cos) in in_band {
            graph.push(entry_with_embedding(
                id,
                unit_vec(cos),
                "public",
                None,
                vec![],
            ));
        }
        // ...then MANY below-floor neighbors so the fetch is K-saturated but its
        // tail has already dropped under 0.75. (K + a margin of below-floor rows.)
        for i in 0..(k + 10) {
            let cos = 0.70 - (i as f32) * 0.001; // all strictly below 0.75 floor
            graph.push(entry_with_embedding(
                &format!("kn-lo{i:02}"),
                unit_vec(cos),
                "public",
                None,
                vec![],
            ));
        }

        for e in &graph {
            db.upsert_knowledge(e).unwrap();
        }

        // Precondition: K-saturated, but lowest returned score is BELOW the floor
        // -> the exact signal says "no truncation possible", escalation suppressed.
        let ctx = AgentContext::public_only();
        let initial = db
            .semantic_search_entries_scored(target.embedding.as_ref().unwrap(), &ctx, k)
            .unwrap();
        assert_eq!(initial.len(), k, "fetch is K-saturated");
        assert!(
            initial.last().unwrap().1 < DEFAULT_ANCHOR_THRESHOLD,
            "lowest returned score is below floor -> escalation must NOT fire"
        );

        auto_anchor("kn-target", &db, None).unwrap();

        let got = anchors_of(&db, "kn-target");
        assert_eq!(
            got,
            vec!["kn-a".to_string(), "kn-b".to_string(), "kn-c".to_string()],
            "the three in-band anchors are selected from the initial top-K (no escalation needed)"
        );
    }

    // =====================================================================
    // MX_SKIP_WRITE_ANCHOR opt-out (PR #364)
    //
    // Two things under test:
    //   1. write_anchor_enabled — the single source of truth for the gate.
    //      Tested directly (not a re-implemented copy of the condition) for
    //      every accepted value of the flag plus the --no-auto-anchor flag.
    //   2. Durability — a write whose anchoring is skipped (flag ON) still
    //      persists. Proven against a REAL file-backed store across a
    //      drop+reopen, since an in-memory store cannot demonstrate
    //      durability across a fresh connection.
    //
    // commit_entry was REMOVED in this PR: post-#362 the Add path upserts +
    // read-back-verifies + auto_embeds (which upserts again) BEFORE the
    // anchor step, so the entry is provably durable before auto_anchor would
    // run. auto_anchor also returns early WITHOUT upserting whenever an entry
    // has no embedding or no in-band neighbours, so its trailing upsert is an
    // anchor update, not a load-bearing commit. The skip path therefore needs
    // no extra upsert.
    // =====================================================================

    /// Save the current MX_SKIP_WRITE_ANCHOR value, set it (or clear it),
    /// evaluate the gate, then restore — so the env state never leaks.
    /// SAFETY: process-wide env mutation, serialized via `#[serial]`.
    fn gate_with_env(value: Option<&str>, no_auto_anchor: bool) -> bool {
        let prev = std::env::var("MX_SKIP_WRITE_ANCHOR").ok();
        unsafe {
            match value {
                Some(v) => std::env::set_var("MX_SKIP_WRITE_ANCHOR", v),
                None => std::env::remove_var("MX_SKIP_WRITE_ANCHOR"),
            }
        }
        let enabled = write_anchor_enabled(no_auto_anchor);
        unsafe {
            match prev {
                Some(v) => std::env::set_var("MX_SKIP_WRITE_ANCHOR", v),
                None => std::env::remove_var("MX_SKIP_WRITE_ANCHOR"),
            }
        }
        enabled
    }

    #[test]
    #[serial]
    fn write_anchor_enabled_unset_flag_runs_anchoring() {
        assert!(
            gate_with_env(None, false),
            "unset MX_SKIP_WRITE_ANCHOR must leave anchoring ON (default behavior preserved)"
        );
    }

    #[test]
    #[serial]
    fn write_anchor_enabled_flag_1_skips_anchoring() {
        assert!(
            !gate_with_env(Some("1"), false),
            "MX_SKIP_WRITE_ANCHOR=1 must turn write-path anchoring OFF"
        );
    }

    #[test]
    #[serial]
    fn write_anchor_enabled_flag_true_skips_anchoring() {
        assert!(
            !gate_with_env(Some("true"), false),
            "MX_SKIP_WRITE_ANCHOR=true must turn write-path anchoring OFF"
        );
        assert!(
            !gate_with_env(Some("TRUE"), false),
            "MX_SKIP_WRITE_ANCHOR is case-insensitive for 'true'"
        );
    }

    #[test]
    #[serial]
    fn write_anchor_enabled_other_values_run_anchoring() {
        // Only "1"/"true" opt out; anything else (incl. "0", "false", "yes")
        // leaves anchoring on, matching the MX_SKIP_SCHEMA convention.
        assert!(gate_with_env(Some("0"), false), "'0' must not opt out");
        assert!(
            gate_with_env(Some("false"), false),
            "'false' must not opt out"
        );
        assert!(gate_with_env(Some(""), false), "empty must not opt out");
    }

    #[test]
    #[serial]
    fn write_anchor_enabled_cli_flag_always_skips() {
        // --no-auto-anchor closes the gate regardless of the env var.
        assert!(
            !gate_with_env(None, true),
            "--no-auto-anchor must skip anchoring even with the env flag unset"
        );
        assert!(
            !gate_with_env(Some("0"), true),
            "--no-auto-anchor must skip anchoring even when env flag would allow it"
        );
    }

    #[test]
    #[serial]
    fn skipped_anchor_write_persists_across_reopen() {
        // The honest durability test: with anchoring skipped (flag ON), a
        // write must survive being dropped and re-opened from a REAL
        // file-backed store. This is what the write path actually does —
        // upsert_knowledge — minus the auto_anchor step the flag removes.
        //
        // An in-memory store cannot prove this (it dies with the handle), so
        // we use a file-backed store against a tempdir path and reopen it.
        //
        // CRITICAL: we go through `open_file_backed_for_test`, NOT the plain
        // `SurrealDatabase::open`. `open` reads `MX_SURREAL_*` env, and if the
        // ambient shell sets `MX_SURREAL_MODE=network` the explicit tempdir
        // path is ignored and the write lands on the LIVE database — which is
        // exactly how a dim-4 fixture once poisoned every production cosine
        // scan. `open_file_backed_for_test` forces an embedded store at the
        // tempdir and asserts the endpoint is local before any write.
        clear_agent_env();
        let prev = std::env::var("MX_SKIP_WRITE_ANCHOR").ok();
        unsafe { std::env::set_var("MX_SKIP_WRITE_ANCHOR", "1") };

        let tmp = tempfile::tempdir().unwrap();
        let db_path = tmp.path().join("durability.surreal");

        // Precondition: the gate is closed, i.e. the handler would NOT call
        // auto_anchor — only the plain write happens.
        assert!(
            !write_anchor_enabled(false),
            "precondition: flag=1 must skip anchoring"
        );

        // Write phase: open, upsert, drop the handle (simulating process exit).
        {
            let db = SurrealDatabase::open_file_backed_for_test(&db_path).unwrap();
            let entry =
                entry_with_embedding("kn-skip-durable", unit_query(), "public", None, vec![]);
            db.upsert_knowledge(&entry).unwrap();
        }

        // Reopen phase: a brand-new connection to the same on-disk store.
        let reopened = SurrealDatabase::open_file_backed_for_test(&db_path).unwrap();
        let ctx = AgentContext::public_only();
        let got = reopened.get("kn-skip-durable", &ctx).unwrap();

        unsafe {
            match prev {
                Some(v) => std::env::set_var("MX_SKIP_WRITE_ANCHOR", v),
                None => std::env::remove_var("MX_SKIP_WRITE_ANCHOR"),
            }
        }

        assert!(
            got.is_some(),
            "a write with anchoring skipped must persist across a drop+reopen (no commit_entry needed)"
        );
    }
}