ai-memory 0.7.1

AI-agnostic persistent memory system — MCP server, HTTP API, and CLI for any AI platform
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
// Copyright 2026 AlphaOne LLC
// SPDX-License-Identifier: Apache-2.0

//! Memory query / bulk HTTP handlers — `list_memories`,
//! `search_memories`, `forget_memories`, and `bulk_create`.
//!
//! Extracted from [`super::http`] under issue #650 (handler cap ≤1200
//! LOC). Handler bodies are unchanged; only the module surface moved.
//! Wire compatibility preserved via `pub use memories_query::*` in
//! [`super`].

#![allow(clippy::too_many_lines)]

use crate::models::ConfidenceSource;
use crate::models::field_names;
use axum::{
    Json,
    extract::{Query, State},
    http::{HeaderMap, StatusCode},
    response::IntoResponse,
};
use chrono::{Duration, Utc};
use serde_json::json;
use std::sync::Arc;
use uuid::Uuid;

use crate::db;
use crate::models::{CreateMemory, ForgetQuery, ListQuery, Memory, SearchQuery};
use crate::validate;

use super::AppState;
use super::BULK_FANOUT_CONCURRENCY;
#[cfg(feature = "sal")]
use super::StorageBackend;
#[cfg(feature = "sal")]
use super::store_err_to_response;

/// #951 (Track A QC sweep, 2026-05-20) — replaced the local
/// duplicate of `is_visible_to_caller` with a re-export of the
/// canonical helper at [`crate::visibility::is_visible_to_caller`].
/// The local copy was missing the `metadata.target_agent_id` inbox
/// carve-out that the canonical SAL version had — the drift would
/// have silently blocked recipients from seeing their own private-
/// scope inbox messages on list/kg-query paths. Single source now;
/// both `sal` and non-sal builds share the same predicate.
use crate::visibility::is_visible_to_caller;

pub async fn list_memories(
    State(app): State<AppState>,
    headers: HeaderMap,
    Query(p): Query<ListQuery>,
) -> impl IntoResponse {
    // #197: validate agent_id filter values
    if let Some(ref aid) = p.agent_id
        && let Err(e) = validate::validate_agent_id(aid)
    {
        return (
            StatusCode::BAD_REQUEST,
            Json(json!({"error": format!("invalid agent_id filter: {e}")})),
        )
            .into_response();
    }

    // #910 (security-medium, 2026-05-19) — resolve the caller via the
    // `X-Agent-Id` header so the scope=private visibility filter below
    // has a known principal to compare `metadata.agent_id` against.
    // Pre-#910 the handler skipped this step entirely and returned
    // every row matching the requested namespace/tier/etc. shape — an
    // attacker could enumerate scope=private rows authored by other
    // agents by listing their namespace. Header-only authentication
    // (no body field on this GET path); anonymous callers get a
    // per-request `anonymous:req-…` id and see only non-private rows.
    let header_agent_id = headers
        .get(crate::HEADER_AGENT_ID)
        .and_then(|v| v.to_str().ok());
    let caller = match crate::identity::resolve_http_agent_id(None, header_agent_id) {
        Ok(id) => id,
        Err(e) => {
            return (
                StatusCode::BAD_REQUEST,
                Json(json!({"error": crate::errors::msg::invalid("agent_id", e)})),
            )
                .into_response();
        }
    };

    // v0.7.0 Wave-3 — Postgres-backed daemons dispatch through the
    // SAL trait. The trait's `Filter` shape carries
    // `(namespace, tier, tags_any, agent_id, since, until, limit)`,
    // which is the same projection the legacy `db::list` accepts plus
    // a deterministic ordering. The `min_priority` and `offset`
    // filters that exist only on the SQLite path are not yet exposed
    // through the trait — when set on a Postgres daemon they are
    // silently ignored (logged at debug). Offset can be emulated
    // client-side by raising `limit` and slicing; min_priority is
    // tracked for trait extension in the next wave.
    #[cfg(feature = "sal")]
    if matches!(app.storage_backend, StorageBackend::Postgres) {
        if p.offset.unwrap_or(0) > 0 {
            tracing::debug!(
                "list_memories on postgres: ?offset is unsupported on the SAL trait; ignored"
            );
        }
        if p.min_priority.is_some() {
            tracing::debug!(
                "list_memories on postgres: ?min_priority is unsupported on the SAL trait; ignored"
            );
        }
        let limit = p.limit.unwrap_or(20).min(app.max_page_size);
        let since = p
            .since
            .as_deref()
            .and_then(|s| chrono::DateTime::parse_from_rfc3339(s).ok())
            .map(|d| d.with_timezone(&chrono::Utc));
        let until = p
            .until
            .as_deref()
            .and_then(|s| chrono::DateTime::parse_from_rfc3339(s).ok())
            .map(|d| d.with_timezone(&chrono::Utc));
        let filter = crate::store::Filter {
            namespace: p.namespace.clone(),
            tier: p.tier.clone(),
            // #869 audit (Category B — safe default): missing `tags`
            // querystring collapses to empty `Vec<String>` which the
            // SAL `Filter` treats as "no tag filter" — documented.
            tags_any: p
                .tags
                .as_deref()
                .map(|s| s.split(',').map(str::to_string).collect())
                .unwrap_or_default(),
            agent_id: p.agent_id.clone(),
            since,
            until,
            limit,
        };
        let ctx = crate::store::CallerContext::for_agent(&caller);
        return match app.store.list(&ctx, &filter).await {
            Ok(mems) => {
                // #910 — post-filter scope=private rows the caller does
                // not own. Done in-process rather than via the SAL
                // `Filter` because the trait's filter shape does not
                // carry a scope axis yet (tracked for the next trait
                // extension wave); the post-filter is correctness-
                // equivalent to a WHERE clause at the SQL layer for
                // the result-set sizes that fit the trait's `limit`.
                let visible: Vec<Memory> = mems
                    .into_iter()
                    .filter(|m| is_visible_to_caller(m, &caller))
                    .collect();
                Json(json!({"memories": &visible, "count": visible.len()})).into_response()
            }
            Err(e) => store_err_to_response(e),
        };
    }

    let lock = app.db.lock().await;
    // v0.6.2 (S40): raise ceiling from 200 → the operator-resolved
    // `app.max_page_size` (compiled default `MAX_BULK_SIZE` = 1000) so bulk
    // fanout scenarios that POST 500+ rows to a leader can verify full
    // peer delivery via a single `GET /memories?limit=N` (previously the
    // list silently capped at 200 regardless of whether fanout worked).
    // Default remains 20 — only explicit `?limit=` callers see the
    // higher ceiling.
    let limit = p.limit.unwrap_or(20).min(app.max_page_size);
    match db::list(
        &lock.0,
        p.namespace.as_deref(),
        p.tier.as_ref(),
        limit,
        p.offset.unwrap_or(0),
        p.min_priority,
        p.since.as_deref(),
        p.until.as_deref(),
        p.tags.as_deref(),
        p.agent_id.as_deref(),
    ) {
        Ok(mems) => {
            // #910 — see postgres branch comment above. `db::list` does
            // NOT apply the visibility-prefix filter that `db::search`
            // and `db::recall_hybrid` use; that gap is what closed the
            // cross-tenant enumeration vector. Post-filter in-process
            // until the next storage-layer wave threads a `caller`
            // through `db::list` and rewrites the WHERE clause to use
            // the same `visibility_clause` helper as the search path.
            let visible: Vec<Memory> = mems
                .into_iter()
                .filter(|m| is_visible_to_caller(m, &caller))
                .collect();
            Json(json!({"memories": &visible, "count": visible.len()})).into_response()
        }
        Err(e) => crate::handlers::errors::handler_error_500(&e),
    }
}

pub async fn search_memories(
    State(app): State<AppState>,
    headers: axum::http::HeaderMap,
    Query(p): Query<SearchQuery>,
) -> impl IntoResponse {
    // #891: source_uri-only queries are valid (Gap 6 #889 reciprocal
    // queries). Reject only when BOTH q and source_uri are empty.
    let source_uri_empty = p.source_uri.as_deref().is_none_or(|s| s.trim().is_empty());
    if p.q.trim().is_empty() && source_uri_empty {
        return (
            StatusCode::BAD_REQUEST,
            Json(json!({"error": "query or source_uri is required"})),
        )
            .into_response();
    }
    // #197: validate agent_id filter values
    if let Some(ref aid) = p.agent_id
        && let Err(e) = validate::validate_agent_id(aid)
    {
        return (
            StatusCode::BAD_REQUEST,
            Json(json!({"error": format!("invalid agent_id filter: {e}")})),
        )
            .into_response();
    }
    // #151 visibility: validate --as-agent namespace if supplied
    if let Some(ref a) = p.as_agent
        && let Err(e) = validate::validate_namespace(a)
    {
        return (
            StatusCode::BAD_REQUEST,
            Json(json!({"error": crate::errors::msg::invalid("as_agent", e)})),
        )
            .into_response();
    }
    // #1579 B4 — negotiate the response format BEFORE doing any work
    // (json default | toon | toon_compact; invalid → 400 with the
    // SSOT message). Mirrors the recall handlers.
    let format = match crate::toon::WireFormat::parse_http(p.format.as_deref()) {
        Ok(f) => f,
        Err(e) => return crate::handlers::wire_format::invalid_format_response(&e),
    };

    // v0.7.0 Wave-3 — Postgres-backed daemons dispatch through the
    // SAL trait. The Postgres adapter's `search` runs the same
    // text-search projection as SQLite's FTS5 path with the trait's
    // `Filter` carried verbatim; result wire-shape matches the
    // legacy `db::search` envelope.
    #[cfg(feature = "sal")]
    if matches!(app.storage_backend, StorageBackend::Postgres) {
        let limit = p.limit.unwrap_or(20).min(app.max_page_size);
        let since = p
            .since
            .as_deref()
            .and_then(|s| chrono::DateTime::parse_from_rfc3339(s).ok())
            .map(|d| d.with_timezone(&chrono::Utc));
        let until = p
            .until
            .as_deref()
            .and_then(|s| chrono::DateTime::parse_from_rfc3339(s).ok())
            .map(|d| d.with_timezone(&chrono::Utc));
        let filter = crate::store::Filter {
            namespace: p.namespace.clone(),
            tier: p.tier.clone(),
            // #869 audit (Category B — safe default): missing `tags`
            // querystring collapses to empty `Vec<String>` which the
            // SAL `Filter` treats as "no tag filter" — documented.
            tags_any: p
                .tags
                .as_deref()
                .map(|s| s.split(',').map(str::to_string).collect())
                .unwrap_or_default(),
            agent_id: p.agent_id.clone(),
            since,
            until,
            limit,
        };
        // #942 SECURITY-high (Track A QC sweep, 2026-05-20) — replace
        // the hardcoded `"ai:http"` principal with the header-resolved
        // caller so the SAL #910 scope=private visibility filter
        // actually applies per-caller. Pre-fix every HTTP search ran
        // as the same synthetic principal, so the visibility filter
        // only filtered out rows owned by other-than-"ai:http" —
        // effectively no filter for tenant-facing reads.
        let header_agent_id = headers
            .get(crate::HEADER_AGENT_ID)
            .and_then(|v| v.to_str().ok());
        let caller = crate::identity::resolve_http_agent_id(None, header_agent_id)
            .unwrap_or_else(|_| crate::identity::anonymous_request_id());
        let ctx = crate::store::CallerContext {
            agent_id: caller,
            as_agent: p.as_agent.clone(),
            request_id: None,
            // #910 — tenant-facing path; never bypass the visibility filter.
            bypass_visibility: false,
        };
        return match app.store.search(&ctx, &p.q, &filter).await {
            // #1579 B4 — serialize per the negotiated format.
            Ok(r) => crate::handlers::wire_format::search_response(
                format,
                json!({"results": r, "count": r.len(), "query": p.q}),
            ),
            Err(e) => store_err_to_response(e),
        };
    }

    // #942 SECURITY-high (Track A QC sweep, 2026-05-20) — fall back
    // to the header-resolved caller's namespace as the visibility
    // filter principal when `?as_agent=` is not supplied. Pre-fix
    // callers who didn't bother to set `as_agent` got an unfiltered
    // search — including scope=private rows owned by other tenants.
    // `as_agent` semantics: it's the caller's namespace ancestor
    // (agent_id IS the agent's namespace prefix per
    // src/identity/mod.rs); `compute_visibility_prefixes` walks
    // ancestors from there.
    let header_agent_id = headers
        .get(crate::HEADER_AGENT_ID)
        .and_then(|v| v.to_str().ok());
    let effective_as_agent: Option<String> = p
        .as_agent
        .clone()
        .or_else(|| crate::identity::resolve_http_agent_id(None, header_agent_id).ok());

    let lock = app.db.lock().await;
    // v0.6.2 (S40): mirror the `list_memories` ceiling raise so search
    // over a bulk-populated namespace isn't also capped at 200.
    let limit = p.limit.unwrap_or(20).min(app.max_page_size);
    // v0.7.0 Provenance Gap 6 (#889) — `?source_uri=X` reciprocal
    // filter. Composes with `?q=…`; when `q` is empty + `source_uri`
    // is set, routes through the index-only `list_by_source_uri`
    // path so callers can ask "give me every memory from this
    // document" without typing a search query.
    let source_uri = p
        .source_uri
        .as_deref()
        .map(str::trim)
        .filter(|s| !s.is_empty());
    if let Some(uri) = source_uri {
        if let Err(e) = validate::validate_source_uri(uri) {
            return (
                StatusCode::BAD_REQUEST,
                Json(json!({"error": format!("invalid source_uri filter: {e}")})),
            )
                .into_response();
        }
        if p.q.trim().is_empty() {
            // #975 — thread the HTTP-resolved visibility principal so
            // the source_uri-only reciprocal endpoint applies the same
            // scope=private gate as the q+source_uri compose path.
            // Pre-fix the reciprocal path bypassed visibility entirely;
            // anonymous callers could read every row in every doc.
            return match db::list_by_source_uri(
                &lock.0,
                uri,
                p.namespace.as_deref(),
                Some(limit),
                effective_as_agent.as_deref(),
            ) {
                // #1579 B4 — serialize per the negotiated format.
                Ok(r) => crate::handlers::wire_format::search_response(
                    format,
                    json!({"results": r, "count": r.len(), (field_names::SOURCE_URI): uri}),
                ),
                Err(e) => crate::handlers::errors::handler_error_500(&e),
            };
        }
    }
    match db::search_with_source_uri(
        &lock.0,
        &p.q,
        p.namespace.as_deref(),
        p.tier.as_ref(),
        limit,
        p.min_priority,
        p.since.as_deref(),
        p.until.as_deref(),
        p.tags.as_deref(),
        p.agent_id.as_deref(),
        effective_as_agent.as_deref(),
        false,
        source_uri,
    ) {
        // #1579 B4 — serialize per the negotiated format.
        Ok(r) => crate::handlers::wire_format::search_response(
            format,
            json!({"results": r, "count": r.len(), "query": p.q}),
        ),
        Err(e) => crate::handlers::errors::handler_error_500(&e),
    }
}

pub async fn forget_memories(
    State(app): State<AppState>,
    headers: axum::http::HeaderMap,
    Json(body): Json<ForgetQuery>,
) -> impl IntoResponse {
    // #942 SECURITY-high (Track A QC sweep, 2026-05-20) — admin-only
    // gate on bulk-forget. `db::forget` is a destructive operation
    // that deletes by namespace + pattern + tier filter; it has no
    // per-row caller filter and adding one without a substrate
    // refactor (touching the SQL CTEs that drive the FTS join) is
    // bigger than the QC sweep budget. Restrict to operators in the
    // admin allowlist (introduced by the #957 fix in commit
    // df7f72545) — same posture as `export_memories`.
    if let Err(resp) = crate::handlers::admin_role::require_admin(&app, &headers, "forget_memories")
    {
        return resp;
    }
    // v0.7.0 Wave-3 Continuation 3 (Phase 13) — route through SAL trait
    // on postgres-backed daemons. Sqlite-backed daemons keep the legacy
    // `db::forget` free-function path verbatim.
    #[cfg(feature = "sal")]
    if matches!(app.storage_backend, StorageBackend::Postgres) {
        let archive_flag = {
            let lock = app.db.lock().await;
            lock.3
        };
        // QC P1 fix (2026-05-20): header-resolved caller so forget()
        // only deletes memories the caller owns. Pre-fix the
        // hardcoded `for_agent("http")` would have let any caller
        // delete memories that matched the namespace/pattern filter
        // regardless of ownership — a destructive privacy bug.
        let ctx = crate::handlers::parity::http_caller_ctx(&headers, None);
        return match app
            .store
            .forget(
                &ctx,
                body.namespace.as_deref(),
                body.pattern.as_deref(),
                body.tier.as_ref(),
                archive_flag,
            )
            .await
        {
            Ok(n) => Json(json!({"deleted": n})).into_response(),
            Err(e) => store_err_to_response(e),
        };
    }

    let lock = app.db.lock().await;
    match db::forget(
        &lock.0,
        body.namespace.as_deref(),
        body.pattern.as_deref(),
        body.tier.as_ref(),
        lock.3, // archive_on_gc
    ) {
        Ok(n) => Json(json!({"deleted": n})).into_response(),
        Err(e) => (
            StatusCode::BAD_REQUEST,
            Json(json!({"error": e.to_string()})),
        )
            .into_response(),
    }
}

// ============================================================================
// v0.7.0 Wave-3 Continuation 6 — three REST endpoints closing F7 cert-harness
// gaps (S52 `links/verify`, S61 `quota/status`, S65 `kg/find_paths`).
// ============================================================================

// ---------------------------------------------------------------------------
// v0.7.0 L6 — `/api/v1/auto_tag` + `/api/v1/expand_query` (S51 surface)
// ---------------------------------------------------------------------------
//
// S51 (autonomous-tier LLM surface) exercises four HTTP endpoints:
// `auto_tag`, `consolidate`, `expand_query`, `detect_contradiction`.
// Pre-L6 the daemon only registered `consolidate` + `contradictions`;
// the other two were available via MCP only. L6 adds the two missing
// REST endpoints with response shapes that match what S51 reads from
// the body (`tags: [...]` and `expansions: [...]`), gated by
// `app.llm.is_some()` so the keyword / semantic tiers (no LLM wired)
// surface a clean 503 instead of a confusing 500.

// ---------------------------------------------------------------------------
// v0.7.0 L9 — `GET /api/v1/tools/list` (NHI-D-501-postgres-traits)
// ---------------------------------------------------------------------------
//
// HTTP parity for the MCP `tools/list` JSON-RPC method. Surfaces the
// canonical tool catalog the daemon advertises under its resolved
// `Profile`, computed from in-memory configuration only — no DB access
// — so the postgres and sqlite paths return byte-identical bodies.
//
// NHI surfaced this as `NHI-D-501-postgres-traits` because the
// postgres-gated daemon returned the generic 501 envelope for the path
// even though the response is pure enumeration. The 501 was a false
// negative: the handler can be implemented entirely off `app.profile`
// + `app.mcp_config`.

// ---------------------------------------------------------------------------
// v0.7.0 L10 — `POST /api/v1/memory_load_family`
// ---------------------------------------------------------------------------
//
// HTTP parity for the MCP `memory_load_family` tool. Filters memories
// by `metadata.family` (a free-form JSON field stamped by the B1 path)
// and returns the top-k recent + high-priority rows. NHI surfaced
// `NHI-D-501-postgres-loadfamily` for the same reason as L9 — the
// endpoint was 501'd on postgres even though `app.store.list(...)`
// already exposes the underlying scan. The handler now dispatches
// through SAL on postgres and through `db::list` on sqlite, doing a
// post-filter on `metadata.family` in-memory because that field is not
// yet a first-class SAL filter axis.

pub async fn bulk_create(
    State(app): State<AppState>,
    headers: HeaderMap,
    Json(bodies): Json<Vec<CreateMemory>>,
) -> impl IntoResponse {
    if bodies.len() > app.max_page_size {
        return (
            StatusCode::BAD_REQUEST,
            Json(
                json!({"error": format!("bulk operations limited to {} items", app.max_page_size)}),
            ),
        )
            .into_response();
    }
    let now = Utc::now();

    // #910 SAL-level — resolve the caller so the per-row metadata
    // stamp matches the authenticated principal. Pre-#910 the bulk
    // path stored `body.metadata` verbatim, so rows landed with no
    // agent_id and the subsequent list/get round-trip via the
    // scope=private filter dropped every one of them. Header-only
    // authentication; anonymous callers stamp `anonymous:req-<uuid>`.
    let header_agent_id = headers
        .get(crate::HEADER_AGENT_ID)
        .and_then(|v| v.to_str().ok());
    let caller = crate::identity::resolve_http_agent_id(None, header_agent_id)
        .unwrap_or_else(|_| crate::identity::anonymous_request_id());

    // v0.7.0 Wave-3 Continuation — postgres-backed daemons stream each
    // row through `app.store.store(...)`. Federation fanout below stays
    // sqlite-only because the federation transport assumes the
    // SQLite-on-disk model; postgres deployments use the postgres replica
    // mechanism for cross-node visibility, not HTTP fanout. The wire
    // shape (created+errors counts) matches the sqlite path exactly.
    #[cfg(feature = "sal")]
    if matches!(app.storage_backend, StorageBackend::Postgres) {
        // QC P1 fix (2026-05-20): bulk_create now uses the already-
        // resolved `caller` from headers (line 491 above) instead of
        // the hardcoded "daemon" sentinel. The stored rows still get
        // their `metadata.agent_id` stamped from the request body /
        // X-Agent-Id header inside `app.store.store(...)`; the ctx
        // here is for visibility-filter purposes (e.g., the
        // `governance_pending_create` precondition lookup the SAL
        // path runs internally).
        let ctx = crate::store::CallerContext::for_agent(caller.clone());
        let mut errors: Vec<String> = Vec::new();
        let mut pending: Vec<serde_json::Value> = Vec::new();
        // #1481 — collect the governance-Allowed rows and persist them in
        // ONE multi-row INSERT via `store_batch`, instead of streaming a
        // `store()` round-trip per row. Validation / Deny / Pending still
        // accumulate per row exactly as before; only the persistence of
        // the surviving rows is batched.
        let mut allowed: Vec<Memory> = Vec::new();
        for body in bodies {
            if let Err(e) = validate::RequestValidator::validate_create(&body) {
                // Issue #851: do not echo the caller's title back paired
                // with the raw error — both are caller-influenced, and
                // the combo can be used to verify presence/shape of
                // server-side fields. Sanitize and log instead.
                tracing::warn!("bulk_create(postgres): validate_create failed: {e}");
                errors.push(super::sanitize_bulk_row_error(&e.to_string()).to_string());
                continue;
            }
            let expires_at = body.expires_at.clone().or_else(|| {
                body.ttl_secs
                    .map(|s| (now + Duration::seconds(s)).to_rfc3339())
            });
            // #910 — stamp metadata.agent_id from the resolved caller
            // so the SAL visibility filter recognises the row as
            // owned by the writer on later get/list/recall.
            let mut metadata_stamped = body.metadata;
            if let Some(obj) = metadata_stamped.as_object_mut() {
                obj.insert(
                    "agent_id".to_string(),
                    serde_json::Value::String(caller.clone()),
                );
            }
            // v0.7.0 #1422 — sister to #1385 (kind) + #1411 (Form-4)
            // single-create fixes. Pre-fix the bulk_create postgres
            // branch validated these fields via RequestValidator above
            // but hardcoded defaults on insert. Resolve here so the
            // struct literal threads the validated values through.
            let memory_kind = body
                .kind
                .as_deref()
                .and_then(crate::models::MemoryKind::from_str)
                .unwrap_or_default();
            let citations = body.citations;
            let source_uri = body.source_uri;
            let source_span = body.source_span;
            let mem = Memory {
                id: Uuid::new_v4().to_string(),
                tier: body.tier,
                namespace: body.namespace,
                title: body.title,
                content: body.content,
                tags: body.tags,
                priority: body.priority.clamp(1, 10),
                // #1591 — omitted confidence resolves to the compiled
                // default with truthful provenance (see below).
                confidence: body
                    .confidence
                    .unwrap_or(crate::models::DEFAULT_CONFIDENCE)
                    .clamp(0.0, 1.0),
                source: body.source,
                access_count: 0,
                created_at: now.to_rfc3339(),
                updated_at: now.to_rfc3339(),
                last_accessed_at: None,
                expires_at,
                metadata: metadata_stamped,
                reflection_depth: 0,
                memory_kind,
                entity_id: None,
                persona_version: None,
                citations,
                source_uri,
                source_span,
                confidence_source: if body.confidence.is_some() {
                    ConfidenceSource::CallerProvided
                } else {
                    ConfidenceSource::Default
                },
                confidence_signals: None,
                confidence_decayed_at: None,
                version: 1,
            };

            // F-A2A1.5 (#705) — governance enforcement on the postgres
            // bulk_create path. Mirrors F-A2A1.2 delete/promote and the
            // Wave-3 Continuation 3 create_memory gate. Each row is a
            // Store action against its own namespace, so the standard's
            // `write=` rule must be consulted per row. Deny rows
            // accumulate into `errors`; Pending rows accumulate into
            // `pending` with their pending_id. Without this gate,
            // postgres-backed daemons silently bypassed namespace
            // governance on the bulk-create surface (same A2A bypass
            // cluster fold-A2A1.2 closed on delete/promote/create
            // paths).
            use crate::models::GovernanceDecision;
            let agent_id = mem
                .metadata
                .get("agent_id")
                .and_then(|v| v.as_str())
                .unwrap_or(crate::identity::sentinels::DAEMON_PRINCIPAL);
            let payload_for_pending = serde_json::to_value(&mem).unwrap_or_else(|_| json!({}));
            match app
                .store
                .enforce_governance_action(
                    crate::store::GovernedAction::Store,
                    &mem.namespace,
                    agent_id,
                    None,
                    None,
                    &payload_for_pending,
                )
                .await
            {
                Ok(GovernanceDecision::Allow) => {}
                Ok(GovernanceDecision::Deny(refusal)) => {
                    errors.push(format!(
                        "{}: bulk_create denied by governance: {reason}",
                        mem.title,
                        reason = refusal.reason,
                    ));
                    continue;
                }
                Ok(GovernanceDecision::Pending(pending_id)) => {
                    pending.push(json!({
                        "title": mem.title,
                        "namespace": mem.namespace,
                        (field_names::PENDING_ID): pending_id,
                    }));
                    continue;
                }
                Err(e) => {
                    errors.push(format!("{}: governance error: {e}", mem.title));
                    continue;
                }
            }

            allowed.push(mem);
        }

        // #1481 — single batched upsert for every Allowed row. The batch
        // is atomic: on success each row counts as `created` (matching
        // the prior per-row semantics, including upserts); on failure the
        // whole batch rolled back, so all Allowed rows report one
        // sanitized error rather than partially-applied state.
        let created: usize = if allowed.is_empty() {
            0
        } else {
            match app.store.store_batch(&ctx, &allowed).await {
                Ok(ids) => ids.len(),
                Err(e) => {
                    // Issue #851: SAL store errors can carry raw sqlx
                    // text. Sanitize before echoing.
                    tracing::warn!("bulk_create(postgres): store_batch failed: {e}");
                    errors.push(super::sanitize_bulk_row_error(&e.to_string()).to_string());
                    0
                }
            }
        };
        return Json(json!({
            "created": created,
            "errors": errors,
            "pending": pending,
        }))
        .into_response();
    }

    // Stage 1 — validate + insert locally. Collect the successfully-inserted
    // `Memory` values so we can fanout each one after we release the DB lock
    // (peers POST to our /sync/push and we'd deadlock on the Mutex if we
    // held it across the network call).
    let mut created_mems: Vec<Memory> = Vec::new();
    let mut errors: Vec<String> = Vec::new();
    {
        let lock = app.db.lock().await;
        for body in bodies {
            if let Err(e) = validate::RequestValidator::validate_create(&body) {
                // Issue #851: do not echo the caller's title back paired
                // with the raw error. Sanitize and log instead.
                tracing::warn!("bulk_create: validate_create failed: {e}");
                errors.push(super::sanitize_bulk_row_error(&e.to_string()).to_string());
                continue;
            }
            let expires_at = body.expires_at.or_else(|| {
                body.ttl_secs
                    .or(lock.2.ttl_for_tier(&body.tier))
                    .map(|s| (now + Duration::seconds(s)).to_rfc3339())
            });
            // #910 — stamp metadata.agent_id from the resolved caller
            // (sqlite branch mirror of the postgres branch above).
            let mut metadata_stamped = body.metadata;
            if let Some(obj) = metadata_stamped.as_object_mut() {
                obj.insert(
                    "agent_id".to_string(),
                    serde_json::Value::String(caller.clone()),
                );
            }
            // v0.7.0 #1422 — sister to #1385 + #1411 fix on the sqlite
            // bulk_create branch. Resolve before the struct literal so
            // body's owned fields aren't partial-moved before the kind
            // parse runs.
            let memory_kind = body
                .kind
                .as_deref()
                .and_then(crate::models::MemoryKind::from_str)
                .unwrap_or_default();
            let citations = body.citations;
            let source_uri = body.source_uri;
            let source_span = body.source_span;
            let mem = Memory {
                id: Uuid::new_v4().to_string(),
                tier: body.tier,
                namespace: body.namespace,
                title: body.title,
                content: body.content,
                tags: body.tags,
                priority: body.priority.clamp(1, 10),
                // #1591 — omitted confidence resolves to the compiled
                // default with truthful provenance (see below).
                confidence: body
                    .confidence
                    .unwrap_or(crate::models::DEFAULT_CONFIDENCE)
                    .clamp(0.0, 1.0),
                source: body.source,
                access_count: 0,
                created_at: now.to_rfc3339(),
                updated_at: now.to_rfc3339(),
                last_accessed_at: None,
                expires_at,
                metadata: metadata_stamped,
                reflection_depth: 0,
                memory_kind,
                entity_id: None,
                persona_version: None,
                citations,
                source_uri,
                source_span,
                confidence_source: if body.confidence.is_some() {
                    ConfidenceSource::CallerProvided
                } else {
                    ConfidenceSource::Default
                },
                confidence_signals: None,
                confidence_decayed_at: None,
                version: 1,
            };
            match db::insert(&lock.0, &mem) {
                Ok(_) => created_mems.push(mem),
                Err(e) => {
                    // Issue #851: db::insert errors include raw rusqlite
                    // text (constraint names, SQL fragments). Sanitize.
                    tracing::warn!("bulk_create: db::insert failed: {e}");
                    errors.push(super::sanitize_bulk_row_error(&e.to_string()).to_string());
                }
            }
        }
    }
    // Stage 2 — federation fanout, once per successfully-inserted row.
    //
    // v0.6.2 (S40): we run each row's `broadcast_store_quorum` *concurrently*
    // via `tokio::task::JoinSet`, bounded by a semaphore so we never have
    // more than `BULK_FANOUT_CONCURRENCY` in-flight fanouts at a time. The
    // prior form looped sequentially and paid one full ack-round-trip per
    // row — 500 rows × ~100ms = 50s, dwarfing the scenario's 20s settle
    // window so peers only received the first ~200 writes in time.
    //
    // Why a bound instead of unbounded? Unbounded (`JoinSet.spawn` for
    // each row at once) fires N × peers concurrent reqwest POSTs. At N=500
    // × 3 peers = 1500 concurrent TCP connects this exhausts ephemeral
    // ports and the reqwest client's connection pool, manifesting as
    // `network: error sending request` on most rows. A bound of 32
    // concurrent fanouts still pipelines the ack round-trip (100ms per
    // row × 500 / 32 ≈ 1.6s wall), well inside the 20s scenario budget.
    //
    // Each row's broadcast still uses the full quorum contract (local +
    // W-1 peer acks or 503). The semaphore only limits concurrency; it
    // does NOT weaken any single row's guarantees. Non-quorum errors
    // land in `errors` with the row id prefix, exactly as before. On a
    // quorum miss we keep going — a single row's miss must not abort the
    // other 499 the caller just paid for (bulk semantics, deliberately
    // weaker than `create_memory`'s 503 short-circuit).
    // Concurrency bound balances:
    //   - Speedup over sequential: N / bound × ack — need bound ≥ a few to
    //     clear 500 rows × 100ms ack inside the scenario's 20s settle.
    //   - Peer-side contention: every concurrent fanout lands a sync_push
    //     POST on the same SQLite Mutex on each peer. Too many in-flight
    //     serialize at the peer's DB lock and either timeout the quorum
    //     window or hit reqwest connection-pool / ephemeral-port limits
    //     on the leader side.
    //
    // 8 is a conservative compromise: 500 × 100ms / 8 ≈ 6.2s wall, comfortably
    // under the scenario's 20s budget while keeping the peer's per-writer
    // queue short enough to avoid timeouts under typical testbook load.
    // Tuned via the `BULK_FANOUT_CONCURRENCY` module constant.
    if let Some(fed) = app.federation.as_ref() {
        let sem = Arc::new(tokio::sync::Semaphore::new(BULK_FANOUT_CONCURRENCY));
        let mut joins: tokio::task::JoinSet<(String, Result<(), String>)> =
            tokio::task::JoinSet::new();
        for mem in &created_mems {
            let fed = fed.clone();
            let mem = mem.clone();
            let sem = sem.clone();
            joins.spawn(async move {
                // `acquire_owned` + a semaphore the task owns a clone of
                // means the permit lives for the task's lifetime — it's
                // released only when the task completes. A closed
                // semaphore would be a bug; surface it via the error
                // channel and keep going.
                let Ok(_permit) = sem.acquire_owned().await else {
                    return (mem.id.clone(), Err("fanout semaphore closed".to_string()));
                };
                let id = mem.id.clone();
                let outcome = match crate::federation::broadcast_store_quorum(&fed, &mem).await {
                    Ok(tracker) => match crate::federation::finalise_quorum(&tracker) {
                        Ok(_) => Ok(()),
                        Err(err) => Err(err.to_string()),
                    },
                    Err(e) => {
                        tracing::warn!(
                            "bulk_create: fanout for {id} failed (local committed): {e:?}"
                        );
                        Ok(())
                    }
                };
                (id, outcome)
            });
        }
        while let Some(res) = joins.join_next().await {
            match res {
                Ok((id, Err(err))) => errors.push(format!("{id}: {err}")),
                Ok((_, Ok(()))) => {}
                Err(e) => tracing::warn!("bulk_create: fanout task join error: {e:?}"),
            }
        }

        // v0.6.2 Patch 2 (S40): terminal catchup batch. Per-row quorum
        // met above, but the post-quorum detach path — even with
        // retry-once in `post_and_classify` — can still leave a peer
        // one row behind under sustained SQLite-mutex contention (v3r26
        // hermes-tls 499/500 and v3r27 ironclaw-off 499/500 both tripped
        // the scenario despite the retry). A single batched `sync_push`
        // per peer with every committed row closes the gap: peer's
        // `insert_if_newer` no-ops rows it already has and applies the
        // missing one. O(1) extra POST per peer vs O(N) per-row retries.
        //
        // Errors are logged and folded into the response `errors` array
        // but do NOT fail the bulk write — quorum was already met, so
        // the HTTP contract is satisfied. The catchup only strengthens
        // eventual consistency within the scenario settle window.
        if !created_mems.is_empty() {
            let catchup_errors = crate::federation::bulk_catchup_push(fed, &created_mems).await;
            for (peer_id, err) in catchup_errors {
                errors.push(format!("catchup to {peer_id}: {err}"));
            }
        }
    }
    Json(json!({"created": created_mems.len(), "errors": errors})).into_response()
}

// ===========================================================================
// #868 — inline tests for `handlers/http.rs`.
//
// The code-review verdict pinned `handlers/http.rs` for "0 inline tests
// across remaining prod LOC". This module establishes the discipline:
// one focused test per #866 stage helper so the next refactor has
// shape-pinning. Not aiming for 100% coverage — the integration suite
// under `tests/` already exercises the orchestrated path end-to-end.
//
// Coverage map (10 tests):
//   - resolve_create_agent_id    (4) header / body / metadata / fallback
//   - resolve_create_conflict_title (3) error → 409, version → suffix, merge → passthrough
//   - embed_create_before_lock   (1) no embedder ⇒ (None, Indexed)
//   - validate_create early-return (1) empty title ⇒ 400
//   - GovernanceRefusal downcast (1) → 403 + GOVERNANCE_REFUSED code
// ===========================================================================