Skip to main content

pond/
handlers.rs

1//! Transport-agnostic wire handlers (spec.md#protocol), one inner module per
2//! operation.
3
4fn map_error(error: crate::Error) -> crate::wire::ErrorEnvelope {
5    error.into()
6}
7
8/// Typed identifier for the namespace a wire request targets. v1 is
9/// single-namespace, so every successful resolve returns `root()`; the
10/// type lets future multi-namespace routing land without churning call
11/// sites (spec.md#wire-namespace-resolution).
12#[derive(Debug, Clone)]
13pub struct NamespaceIdent(pub Vec<String>);
14
15impl NamespaceIdent {
16    pub fn root() -> Self {
17        Self(vec![])
18    }
19    pub fn as_table_id(&self, table_name: &str) -> Vec<String> {
20        let mut id = self.0.clone();
21        id.push(table_name.to_string());
22        id
23    }
24}
25
26/// The one and only namespace-resolution point; every wire handler funnels
27/// through this. v1 accepts `None` or the default and returns the singleton
28/// root namespace; everything else is a hard reject.
29pub fn resolve_namespace(
30    namespace: Option<&str>,
31) -> Result<NamespaceIdent, crate::wire::ErrorEnvelope> {
32    match namespace {
33        None | Some(crate::wire::DEFAULT_NAMESPACE) => Ok(NamespaceIdent::root()),
34        Some(other) => Err(map_error(crate::Error::namespace_unknown(other))),
35    }
36}
37
38fn map_storage(error: anyhow::Error) -> crate::wire::ErrorEnvelope {
39    // Classify before bucketing: an OCC commit-conflict exhaustion has its own
40    // wire code (spec.md#protocol). Everything else lands in `storage_unavailable`.
41    if let Some(conflict) = error.downcast_ref::<crate::substrate::ConflictExhausted>() {
42        return map_error(crate::Error::Conflict {
43            attempts: conflict.attempts,
44        });
45    }
46    map_error(crate::Error::Storage(error))
47}
48
49mod ingest_handler {
50    use anyhow::Result;
51    use tokio_stream::StreamExt;
52
53    use crate::{
54        adapter::{Adapter, AdapterYield, SkipOracle, SkipReason},
55        sessions::{IngestEvent, IngestSummary, IngestValidator, OutcomeStatus, RowOutcome, Store},
56        wire::{
57            ErrorBody, ErrorCode, IngestEnvelope, IngestRequest, IngestResponse, IngestResult,
58            IngestStatus, validate_protocol,
59        },
60    };
61
62    use super::{map_error, map_storage};
63
64    /// Hard cap on events per `pond_ingest` batch (spec.md#protocol).
65    pub const MAX_INGEST_EVENTS: usize = 1000;
66
67    /// Progress signals emitted by [`ingest_adapter`] for the CLI bar (and
68    /// any other observer). One [`SyncEvent::Discovered`] fires up front
69    /// once `adapter.discover()` returns; then one [`SyncEvent::SessionDone`]
70    /// fires per session as the validator commits it or the adapter skips
71    /// it. The adapter path never errors at the event level - every
72    /// per-session outcome is surfaced through this enum.
73    #[derive(Debug, Clone)]
74    pub enum SyncEvent {
75        /// Up-front session count from `adapter.discover()`. Emitted exactly
76        /// once before any `SessionDone`. When discovery fails, the field is
77        /// `None` and the bar runs in rolling-counter mode.
78        Discovered { total: Option<usize> },
79        /// One session finished: committed, skipped (undecodable source),
80        /// or rejected by the validator.
81        SessionDone(SessionOutcome),
82    }
83
84    /// What happened to one session in an adapter-driven sync.
85    #[derive(Debug, Clone)]
86    pub struct SessionOutcome {
87        /// Project/cwd the session ran in, when the adapter could parse it.
88        pub project: Option<String>,
89        /// Session id, when the source was decodable far enough to read one.
90        /// `None` means the file was unreadable before any `Session` event.
91        pub session_id: Option<String>,
92        /// Messages observed in the source stream (not the same as rows
93        /// written: validator-rejected sessions still report the count).
94        pub messages: usize,
95        pub status: SyncStatus,
96    }
97
98    /// Per-session outcome class.
99    ///
100    /// - `Ok` - committed cleanly, zero drops.
101    /// - `Partial` - committed, but the validator dropped N events from this
102    ///   session (per-event drop policy: bad-line skips, ordering violations,
103    ///   duplicate ids). The non-bad events landed.
104    /// - `Skipped` - the adapter couldn't extract a Session header from this
105    ///   file at all (empty `.jsonl`, header corruption). Nothing written.
106    /// - `Rejected` - the validator rejected the session at flush time on a
107    ///   Session-level invariant (`source_agent` / `project` immutability).
108    ///   The substream is dropped wholesale. This is the rare case where the
109    ///   *whole* session is lost; for everything else use `Partial`.
110    #[derive(Debug, Clone)]
111    pub enum SyncStatus {
112        Ok,
113        Partial {
114            dropped_events: usize,
115            /// First drop's error message; subsequent drops counted, not
116            /// retained. Full detail at `-vv` (debug) verbosity.
117            first_drop_reason: Option<String>,
118        },
119        Skipped {
120            reason: String,
121        },
122        Rejected {
123            reason: String,
124        },
125        /// Per-session staleness skip (spec.md#adapter-integrity-event-ordering): adapter short-circuited
126        /// the file decode because `mtime < MAX(messages.timestamp)`.
127        Fresh,
128        /// File produced no importable session (empty `.jsonl`, sidecar-only
129        /// rows, or an unextractable header). Benign: counted in
130        /// `skipped_empty`, never an error or a drop.
131        Empty,
132    }
133
134    #[derive(Debug, Default)]
135    struct InFlight {
136        project: Option<String>,
137        session_id: String,
138        messages: usize,
139        /// Events the adapter dropped mid-stream (skip-bad-line) that belong
140        /// to this in-flight session. Summed with the validator's per-event
141        /// drops at flush time to compute the final `SyncStatus::Partial`
142        /// count.
143        dropped_events: usize,
144        first_drop_reason: Option<String>,
145        /// The `index` value used when the Session event was pushed to the
146        /// validator. After batched flush, `RowOutcome.index` lets us match
147        /// per-session outcomes back to the originating session.
148        session_index: usize,
149    }
150
151    /// One session that has been fully observed but whose write hasn't
152    /// completed yet (queued in the validator's batched-flush buffer).
153    /// Emitted as `SyncEvent::SessionDone` after the corresponding flush
154    /// returns its outcomes.
155    #[derive(Debug)]
156    struct PendingDone {
157        project: Option<String>,
158        session_id: String,
159        messages: usize,
160        dropped_events: usize,
161        first_drop_reason: Option<String>,
162        session_index: usize,
163    }
164
165    /// Batch size used by the adapter ingest loop: flush every N completed
166    /// substreams to amortize per-commit cost. 100 is the value validated in
167    /// `benches/ingest_bench.rs` against the measured profile (substream
168    /// flushes were 78-88% of wall time at batch=1; ~25x fewer commits at
169    /// batch=100 closes most of that gap). Memory bound: ~N x (avg events
170    /// per session) staged in RAM, ~tens of MB at this scale.
171    const ADAPTER_FLUSH_BATCH: usize = 100;
172
173    /// Drain `adapter.events()` into `store`, accumulating an [`IngestSummary`]
174    /// and reporting progress through `on_event`. The adapter path is
175    /// CLI-driven (`pond sync`) and reports aggregates, not per-row results -
176    /// the wire-level [`pond_ingest`] handler keeps the per-row contract for
177    /// HTTP clients.
178    ///
179    /// Undecodable session substreams are skipped, not warned: the design
180    /// contract (no silent drops) is met by surfacing each skip through
181    /// `on_event` as [`SyncStatus::Skipped`]. The tracing line stays available
182    /// at DEBUG for deep-debug; default verbosity is silent.
183    pub async fn ingest_adapter<F>(
184        store: &Store,
185        adapter: &dyn Adapter,
186        oracle: &dyn SkipOracle,
187        mut on_event: F,
188    ) -> Result<IngestSummary>
189    where
190        F: FnMut(SyncEvent),
191    {
192        let mut summary = IngestSummary::default();
193        let truncations_before = crate::adapter::extract::truncated_values_count();
194        // Discovery is best-effort: a failure (no read perm, bad config)
195        // still lets the bar run as a rolling counter. We surface the count
196        // upfront when we can; otherwise the bar uses `set_length(0)`.
197        let total = adapter
198            .discover()
199            .await
200            .map_err(|error| tracing::debug!(%error, "adapter discover failed"))
201            .ok();
202        on_event(SyncEvent::Discovered { total });
203
204        let mut events = adapter.events_with(oracle);
205        let mut validator = IngestValidator::default();
206        // Adapter events have no stable input index (they stream from disk);
207        // assign a monotonic counter so RowOutcome.index stays unique even
208        // though the values aren't surfaced anywhere.
209        let mut index = 0usize;
210        let mut in_flight: Option<InFlight> = None;
211        // Sessions whose end-of-stream we've observed but whose write is
212        // still pending in the validator's batch buffer. Drained in FIFO
213        // order against `validator.flush()`'s outcome stream.
214        let mut pending_dones: std::collections::VecDeque<PendingDone> =
215            std::collections::VecDeque::new();
216        // Perf probe accumulators. Logged once at the end of the run at `-v`
217        // (info) verbosity so a single sync emits one tidy summary plus
218        // per-merge_insert lines from substrate. Visible only at INFO; never
219        // affects normal output.
220        let mut decode_total = std::time::Duration::ZERO;
221        let mut decode_count = 0u64;
222        let mut validator_total = std::time::Duration::ZERO;
223        let mut validator_count = 0u64;
224        let run_started = std::time::Instant::now();
225
226        loop {
227            let decode_start = std::time::Instant::now();
228            let next = events.next().await;
229            decode_total += decode_start.elapsed();
230            decode_count += 1;
231            let event = match next {
232                Some(event) => event,
233                None => break,
234            };
235            match event {
236                Ok(AdapterYield::Skipped {
237                    session_id,
238                    project,
239                    reason,
240                }) => {
241                    let status = match reason {
242                        SkipReason::Fresh => {
243                            summary.skipped_fresh += 1;
244                            SyncStatus::Fresh
245                        }
246                        SkipReason::Empty => {
247                            summary.skipped_empty += 1;
248                            SyncStatus::Empty
249                        }
250                        SkipReason::Unsupported(reason) => {
251                            summary.skipped_files += 1;
252                            SyncStatus::Skipped { reason }
253                        }
254                    };
255                    on_event(SyncEvent::SessionDone(SessionOutcome {
256                        project,
257                        session_id,
258                        messages: 0,
259                        status,
260                    }));
261                }
262                Ok(AdapterYield::Event(event)) => {
263                    // A new Session means the current one is being closed
264                    // out by the validator (moved to its `completed` buffer
265                    // for batched flush). Stage the PendingDone so we can
266                    // emit SessionDone with proper status after flush.
267                    if matches!(&event, IngestEvent::Session(_))
268                        && let Some(prev) = in_flight.take()
269                    {
270                        pending_dones.push_back(PendingDone {
271                            project: prev.project,
272                            session_id: prev.session_id,
273                            messages: prev.messages,
274                            dropped_events: prev.dropped_events,
275                            first_drop_reason: prev.first_drop_reason,
276                            session_index: prev.session_index,
277                        });
278                    }
279                    let event_index = index;
280                    match &event {
281                        IngestEvent::Session(session) => {
282                            in_flight = Some(InFlight {
283                                project: Some((*session.project).clone()),
284                                session_id: session.id.clone(),
285                                messages: 0,
286                                dropped_events: 0,
287                                first_drop_reason: None,
288                                session_index: event_index,
289                            });
290                        }
291                        IngestEvent::Message(_) => {
292                            if let Some(slot) = in_flight.as_mut() {
293                                slot.messages += 1;
294                            }
295                        }
296                        IngestEvent::Part(_) => {}
297                    }
298
299                    let validator_start = std::time::Instant::now();
300                    let push_outcomes = validator.push(store, index, event).await?;
301                    validator_total += validator_start.elapsed();
302                    validator_count += 1;
303                    // Per-event drops returned synchronously by push (ordering
304                    // / dup-id violations) attribute to the in-flight
305                    // session's drop count. Session-level errors (e.g. empty
306                    // source_agent) come back here too; we don't currently
307                    // distinguish them - they're rare and end up in
308                    // `summary.dropped_events`.
309                    for outcome in &push_outcomes {
310                        if matches!(outcome.status, OutcomeStatus::Error)
311                            && outcome.kind != "session"
312                            && let Some(slot) = in_flight.as_mut()
313                        {
314                            slot.dropped_events += 1;
315                            if slot.first_drop_reason.is_none() {
316                                slot.first_drop_reason =
317                                    outcome.error.as_ref().map(|err| err.message.clone());
318                            }
319                        }
320                    }
321                    summary.add_outcomes(&push_outcomes);
322                    index += 1;
323
324                    // Drain the batch periodically. The validator's
325                    // `pending_substreams()` count grows by one each time we
326                    // close a substream; once it hits the batch threshold we
327                    // commit them in one parallel 3-table merge_insert.
328                    if validator.pending_substreams() >= ADAPTER_FLUSH_BATCH {
329                        let flush_start = std::time::Instant::now();
330                        let (flush_outcomes, flush_counts) = validator.flush(store).await?;
331                        validator_total += flush_start.elapsed();
332                        validator_count += 1;
333                        // Counts come from the pre-existence sweep inside the
334                        // flush, not from per-row outcomes (which would
335                        // double-count if we also called `add_outcomes`).
336                        summary.add_outcomes_errors_only(&flush_outcomes);
337                        summary.add_batch(&flush_counts);
338                        drain_pending_dones(&mut pending_dones, &flush_outcomes, &mut on_event);
339                    }
340                }
341                Err(error) => {
342                    // Per-event drop semantics: the adapter's error is either
343                    // a pre-Session header failure (whole file unusable) or a
344                    // mid-session bad-line skip. The validator is not reset
345                    // on either case so subsequent good lines from the same
346                    // file still land.
347                    tracing::debug!(
348                        %error,
349                        "adapter event error (per-line drop by design)"
350                    );
351                    match in_flight.as_mut() {
352                        Some(slot) => {
353                            // Mid-session bad line. Charge one dropped event
354                            // to this session; the bar will render the per-
355                            // session summary at SessionDone time.
356                            slot.dropped_events += 1;
357                            if slot.first_drop_reason.is_none() {
358                                slot.first_drop_reason = Some(error.to_string());
359                            }
360                            summary.dropped_events += 1;
361                        }
362                        None => {
363                            // Pre-Session decode failure: no in-flight
364                            // session to attribute to. This is a whole-file
365                            // skip - surface it as a SessionDone with
366                            // session_id=None and status=Skipped.
367                            summary.skipped_files += 1;
368                            on_event(SyncEvent::SessionDone(SessionOutcome {
369                                project: None,
370                                session_id: None,
371                                messages: 0,
372                                status: SyncStatus::Skipped {
373                                    reason: error.to_string(),
374                                },
375                            }));
376                        }
377                    }
378                }
379            }
380        }
381
382        if let Some(prev) = in_flight.take() {
383            pending_dones.push_back(PendingDone {
384                project: prev.project,
385                session_id: prev.session_id,
386                messages: prev.messages,
387                dropped_events: prev.dropped_events,
388                first_drop_reason: prev.first_drop_reason,
389                session_index: prev.session_index,
390            });
391        }
392        let validator_start = std::time::Instant::now();
393        let (final_outcomes, final_counts) = validator.finish(store).await?;
394        validator_total += validator_start.elapsed();
395        validator_count += 1;
396        summary.add_outcomes_errors_only(&final_outcomes);
397        summary.add_batch(&final_counts);
398        drain_pending_dones(&mut pending_dones, &final_outcomes, &mut on_event);
399
400        summary.truncated_values = crate::adapter::extract::truncated_values_count()
401            .saturating_sub(truncations_before) as usize;
402
403        let total = run_started.elapsed();
404        let other = total
405            .saturating_sub(decode_total)
406            .saturating_sub(validator_total);
407        tracing::info!(
408            target: "pond::perf",
409            total_ms = total.as_millis() as u64,
410            decode_ms = decode_total.as_millis() as u64,
411            validator_ms = validator_total.as_millis() as u64,
412            other_ms = other.as_millis() as u64,
413            decode_calls = decode_count,
414            validator_calls = validator_count,
415            rows_inserted = summary.inserted as u64,
416            rows_matched = summary.matched as u64,
417            dropped_events = summary.dropped_events as u64,
418            dropped_sessions = summary.dropped_sessions as u64,
419            skipped_files = summary.skipped_files as u64,
420            skipped_fresh = summary.skipped_fresh as u64,
421            truncated_values = summary.truncated_values as u64,
422            "ingest_adapter complete"
423        );
424        Ok(summary)
425    }
426
427    /// Match the validator's flush outcomes back to the queued PendingDone
428    /// entries (FIFO; `RowOutcome.index` aligns with `PendingDone.session_index`).
429    /// Each matched PendingDone yields one `SyncEvent::SessionDone`. The queue
430    /// drains in order; if outcomes are missing for any (shouldn't happen with
431    /// a well-formed validator path), the SessionDone is emitted as Ok using
432    /// only the adapter-side drop count.
433    fn drain_pending_dones<F>(
434        queue: &mut std::collections::VecDeque<PendingDone>,
435        outcomes: &[RowOutcome],
436        on_event: &mut F,
437    ) where
438        F: FnMut(SyncEvent),
439    {
440        // Index session-kind outcomes by their `index` value so we can look
441        // them up by `session_index` regardless of relative ordering.
442        let mut session_outcome_by_index: std::collections::HashMap<usize, &RowOutcome> =
443            std::collections::HashMap::new();
444        for outcome in outcomes {
445            if outcome.kind == "session" {
446                session_outcome_by_index.insert(outcome.index, outcome);
447            }
448        }
449
450        while let Some(done) = queue.pop_front() {
451            let session_outcome = session_outcome_by_index.get(&done.session_index).copied();
452            let rejection_reason = session_outcome.and_then(|outcome| {
453                if matches!(outcome.status, OutcomeStatus::Error) {
454                    Some(
455                        outcome
456                            .error
457                            .as_ref()
458                            .map(|err| err.message.clone())
459                            .unwrap_or_else(|| "session-level rejection".to_owned()),
460                    )
461                } else {
462                    None
463                }
464            });
465            let status = if let Some(reason) = rejection_reason {
466                SyncStatus::Rejected { reason }
467            } else if done.dropped_events > 0 {
468                SyncStatus::Partial {
469                    dropped_events: done.dropped_events,
470                    first_drop_reason: done.first_drop_reason,
471                }
472            } else {
473                SyncStatus::Ok
474            };
475            on_event(SyncEvent::SessionDone(SessionOutcome {
476                project: done.project,
477                session_id: Some(done.session_id),
478                messages: done.messages,
479                status,
480            }));
481        }
482    }
483
484    /// The `pond_ingest` wire handler (spec.md#protocol): validate the transport
485    /// envelope, then drive the event batch through [`ingest_events`]. Transport
486    /// failures (bad protocol, unknown namespace, empty or oversized batch) fail
487    /// the whole request via the spec.md#protocol; per-event failures land
488    /// in the response's `results[]` with `status: "error"`.
489    pub async fn pond_ingest(store: &Store, request: IngestRequest) -> IngestEnvelope {
490        if let Err(envelope) = validate_protocol(request.protocol_version) {
491            return IngestEnvelope::Error(envelope);
492        }
493        if let Err(envelope) = super::resolve_namespace(request.namespace.as_deref()) {
494            return IngestEnvelope::Error(envelope);
495        }
496        if request.events.is_empty() {
497            return IngestEnvelope::Error(map_error(crate::Error::validation_field(
498                "events must be a non-empty array",
499                "events",
500                Some(serde_json::json!([])),
501                Some("non-empty array".to_owned()),
502            )));
503        }
504        if request.events.len() > MAX_INGEST_EVENTS {
505            return IngestEnvelope::Error(map_error(crate::Error::validation_field(
506                format!("ingest batch exceeds the event cap: at most {MAX_INGEST_EVENTS} events"),
507                "events",
508                Some(serde_json::json!(request.events.len())),
509                Some(format!("at most {MAX_INGEST_EVENTS} events")),
510            )));
511        }
512
513        match ingest_events(store, request.events).await {
514            Ok(outcomes) => {
515                let mut accepted = 0;
516                let mut rejected = 0;
517                for outcome in &outcomes {
518                    match outcome.status {
519                        OutcomeStatus::Inserted | OutcomeStatus::Matched => accepted += 1,
520                        OutcomeStatus::Error => rejected += 1,
521                    }
522                }
523                let results = outcomes
524                    .into_iter()
525                    .map(outcome_to_result)
526                    .collect::<Vec<_>>();
527                IngestEnvelope::Success(IngestResponse {
528                    accepted,
529                    rejected,
530                    results,
531                })
532            }
533            Err(failure) => IngestEnvelope::Error(map_storage(failure)),
534        }
535    }
536
537    /// Drive a flat event batch through [`IngestValidator`], returning per-row
538    /// outcomes in input-array order. A substream that fails validation has
539    /// every one of its events tagged with [`OutcomeStatus::Error`] (the
540    /// offending event and any others in the same substream); ingest of later
541    /// sessions in the batch continues (spec.md#protocol).
542    pub async fn ingest_events(store: &Store, events: Vec<IngestEvent>) -> Result<Vec<RowOutcome>> {
543        let mut validator = IngestValidator::default();
544        let mut outcomes = Vec::with_capacity(events.len());
545        for (index, event) in events.into_iter().enumerate() {
546            let mut chunk = validator.push(store, index, event).await?;
547            outcomes.append(&mut chunk);
548        }
549        // HTTP wire path keeps using per-row outcomes for `IngestResult`;
550        // the batch counts are CLI-only.
551        let (mut tail, _counts) = validator.finish(store).await?;
552        outcomes.append(&mut tail);
553        outcomes.sort_by_key(|outcome| outcome.index);
554        Ok(outcomes)
555    }
556
557    fn outcome_to_result(outcome: RowOutcome) -> IngestResult {
558        let (status, error) = match (outcome.status, outcome.error) {
559            (OutcomeStatus::Inserted, _) => (IngestStatus::Inserted, None),
560            (OutcomeStatus::Matched, _) => (IngestStatus::Matched, None),
561            (OutcomeStatus::Error, error) => {
562                let body = error
563                    .map(|err| {
564                        let mut details = serde_json::Map::new();
565                        if let Some(field) = err.field {
566                            details.insert("field".to_owned(), serde_json::json!(field));
567                        }
568                        if let Some(reason) = err.reason {
569                            details.insert("reason".to_owned(), serde_json::json!(reason));
570                        }
571                        ErrorBody {
572                            code: ErrorCode::ValidationFailed,
573                            message: err.message,
574                            details: serde_json::Value::Object(details),
575                        }
576                    })
577                    .unwrap_or_else(|| ErrorBody {
578                        code: ErrorCode::ValidationFailed,
579                        message: "ingest failed".to_owned(),
580                        details: serde_json::json!({}),
581                    });
582                (IngestStatus::Error, Some(body))
583            }
584        };
585        IngestResult {
586            index: outcome.index,
587            kind: outcome.kind.to_owned(),
588            pk: outcome.pk,
589            status,
590            error,
591        }
592    }
593}
594
595pub use crate::sessions::{IngestEvent, IngestSummary, IngestValidator, search_text};
596pub use ingest_handler::{
597    MAX_INGEST_EVENTS, SessionOutcome, SyncEvent, SyncStatus, ingest_adapter, ingest_events,
598    pond_ingest,
599};
600
601mod export_handler {
602    //! `pond_export` (spec.md#protocol): walk every session in the store and
603    //! emit its canonical event stream as JSONL - one `IngestEvent` per line.
604    //! The output is byte-identical with what `pond ingest` / `pond_ingest`
605    //! accepts on input, so `export | ingest` is a portable backup loop.
606    //! Sessions are emitted in lexicographic id order; within each session,
607    //! messages run in `(timestamp, message_id)` order and each message's
608    //! parts immediately follow in `ordinal` order. Matches the
609    //! spec.md#adapter-integrity-event-ordering ordering contract so the output
610    //! re-imports without re-ordering.
611
612    use anyhow::{Context, Result};
613    use tokio::io::{AsyncWrite, AsyncWriteExt};
614
615    use crate::sessions::{IngestEvent, Store};
616
617    #[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
618    pub struct ExportSummary {
619        pub sessions: usize,
620        pub messages: usize,
621        pub parts: usize,
622    }
623
624    pub async fn pond_export<W>(
625        store: &Store,
626        session_filter: Option<&str>,
627        writer: &mut W,
628    ) -> Result<ExportSummary>
629    where
630        W: AsyncWrite + Unpin,
631    {
632        let mut session_ids = match session_filter {
633            Some(id) => vec![id.to_owned()],
634            None => store.session_ids().await?,
635        };
636        session_ids.sort();
637
638        let mut summary = ExportSummary::default();
639        for session_id in session_ids {
640            let Some(stored) = store
641                .get_session(&session_id)
642                .await
643                .with_context(|| format!("export: failed to load session {session_id}"))?
644            else {
645                if session_filter.is_some() {
646                    anyhow::bail!("export: session not found: {session_id}");
647                }
648                continue;
649            };
650            write_event(writer, &IngestEvent::Session(stored.session)).await?;
651            summary.sessions += 1;
652            for message_with_parts in stored.messages {
653                write_event(writer, &IngestEvent::Message(message_with_parts.message)).await?;
654                summary.messages += 1;
655                for part in message_with_parts.parts {
656                    write_event(writer, &IngestEvent::Part(part)).await?;
657                    summary.parts += 1;
658                }
659            }
660        }
661        writer.flush().await.context("export: flush failed")?;
662        Ok(summary)
663    }
664
665    async fn write_event<W>(writer: &mut W, event: &IngestEvent) -> Result<()>
666    where
667        W: AsyncWrite + Unpin,
668    {
669        let line = serde_json::to_string(event).context("export: serialize event")?;
670        writer
671            .write_all(line.as_bytes())
672            .await
673            .context("export: write event")?;
674        writer
675            .write_all(b"\n")
676            .await
677            .context("export: write newline")?;
678        Ok(())
679    }
680}
681
682pub use export_handler::{ExportSummary, pond_export};
683
684mod restore_handler {
685    //! `restore_lineage` (spec.md#adapter-lineage-complete-restore): collect the named
686    //! session plus its direct subagent children for the `pond export session
687    //! --as` restore path. The spawn graph is one level deep; a collected
688    //! child that is itself a parent means a deeper graph, which is a typed
689    //! error - never a silently flattened restore.
690
691    use anyhow::{Context, Result, bail};
692
693    use crate::sessions::{SessionWithMessages, Store};
694
695    pub async fn restore_lineage(
696        store: &Store,
697        session_id: &str,
698    ) -> Result<Vec<SessionWithMessages>> {
699        let Some(parent) = store.get_session(session_id).await? else {
700            bail!("export: session not found: {session_id}");
701        };
702        let mut sessions = vec![parent];
703        for child in store.child_sessions(session_id).await? {
704            if !store.child_sessions(&child.id).await?.is_empty() {
705                bail!(
706                    "adapter-lineage-complete-restore supports one subagent level; session {} has child sessions",
707                    child.id
708                );
709            }
710            let child_id = child.id;
711            let stored = store
712                .get_session(&child_id)
713                .await?
714                .with_context(|| format!("export: child session disappeared: {child_id}"))?;
715            sessions.push(stored);
716        }
717        Ok(sessions)
718    }
719}
720
721pub use restore_handler::restore_lineage;
722
723mod get_handler {
724    use crate::{
725        sessions::{GetLookup, MessageViewParams, RetrievedMessage, SessionViewParams, Store},
726        wire::{
727            GetEnvelope, GetRequest, GetResponse, GetResult, GetSession, MessageView, PartSummary,
728            ResponseMode, ResponsePart, validate_protocol,
729        },
730    };
731
732    use super::{map_error, map_storage};
733
734    /// Project canonical retrieval data into the response DTO. In `verbatim` the
735    /// full parts ride `parts` and `text`/`content` are dropped - they would just
736    /// duplicate the inlined text part; otherwise the compact summary rides along
737    /// and full parts are elided.
738    fn to_message_view(message: RetrievedMessage, verbatim: bool) -> MessageView {
739        if verbatim {
740            return MessageView {
741                id: message.id,
742                role: message.role,
743                timestamp: message.timestamp,
744                text: None,
745                content: None,
746                parts_summary: Vec::new(),
747                parts: Some(
748                    message
749                        .parts
750                        .into_iter()
751                        .map(ResponsePart::from_part)
752                        .collect(),
753                ),
754            };
755        }
756        let parts_summary = message
757            .parts
758            .iter()
759            .filter_map(|part| PartSummary::for_kind(&part.kind))
760            .collect();
761        MessageView {
762            id: message.id,
763            role: message.role,
764            timestamp: message.timestamp,
765            text: message.text,
766            content: message.content,
767            parts_summary,
768            parts: None,
769        }
770    }
771
772    /// Server response budget, sized to the declared
773    /// `_meta["anthropic/maxResultSizeChars"]` cap (~200KB / ~50k tokens). The
774    /// server stops adding messages (or parts) when the next would exceed it;
775    /// `messages_remaining` / `target_parts_remaining` then signal pagination.
776    const BUDGET_BYTES: usize = 200_000;
777
778    pub async fn pond_get(store: &Store, request: GetRequest) -> GetEnvelope {
779        if let Err(error) = validate_protocol(request.protocol_version) {
780            return GetEnvelope::Error(error);
781        }
782        if let Err(envelope) = super::resolve_namespace(request.namespace.as_deref()) {
783            return GetEnvelope::Error(envelope);
784        }
785
786        let response = match (&request.session_id, &request.message_id) {
787            (Some(session_id), None) => session_result(store, session_id, &request).await,
788            (None, Some(message_id)) => message_result(store, message_id, &request).await,
789            (Some(_), Some(_)) => Err(map_error(crate::Error::validation_field(
790                "session_id and message_id are mutually exclusive",
791                "message_id",
792                request.message_id.clone().map(serde_json::Value::String),
793                Some("omit when session_id is present".to_owned()),
794            ))),
795            (None, None) => Err(map_error(crate::Error::validation(
796                "one of session_id or message_id is required",
797            ))),
798        };
799
800        match response {
801            Ok(response) => GetEnvelope::Success(response),
802            Err(error) => GetEnvelope::Error(error),
803        }
804    }
805
806    /// Map a stale/unknown `after_id` to a `validation_failed` naming the fix
807    /// (spec.md#protocol); `anchor_of` describes the id the client should supply.
808    fn unknown_after_id(request: &GetRequest, anchor_of: &str) -> crate::wire::ErrorEnvelope {
809        map_error(crate::Error::validation_field(
810            "after_id not found (stale or mistyped continuation anchor)",
811            "after_id",
812            request.after_id.clone().map(serde_json::Value::String),
813            Some(format!("a {anchor_of} from a prior page of this read")),
814        ))
815    }
816
817    async fn session_result(
818        store: &Store,
819        session_id: &str,
820        request: &GetRequest,
821    ) -> Result<GetResponse, crate::wire::ErrorEnvelope> {
822        let params = SessionViewParams {
823            mode: request.response_mode,
824            after_id: request.after_id.as_deref(),
825            limit: request.limit,
826            budget_bytes: BUDGET_BYTES,
827            session_from: request.session_from,
828        };
829        let view = match store
830            .session_view(session_id, params)
831            .await
832            .map_err(map_storage)?
833        {
834            GetLookup::NotFound => {
835                return Err(map_error(crate::Error::not_found(
836                    "session",
837                    serde_json::json!(session_id),
838                    format!("session not found: {session_id}"),
839                )));
840            }
841            GetLookup::UnknownAfterId => return Err(unknown_after_id(request, "message id")),
842            GetLookup::Found(view) => view,
843        };
844        let verbatim = matches!(request.response_mode, ResponseMode::Verbatim);
845        Ok(GetResponse {
846            session: GetSession::from_session(&view.session),
847            result: GetResult::Session {
848                messages: view
849                    .messages
850                    .into_iter()
851                    .map(|message| to_message_view(message, verbatim))
852                    .collect(),
853                messages_remaining: view.messages_remaining,
854            },
855        })
856    }
857
858    async fn message_result(
859        store: &Store,
860        message_id: &str,
861        request: &GetRequest,
862    ) -> Result<GetResponse, crate::wire::ErrorEnvelope> {
863        let params = MessageViewParams {
864            context_depth: request.context_depth,
865            mode: request.response_mode,
866            after_id: request.after_id.as_deref(),
867            limit: request.limit,
868            budget_bytes: BUDGET_BYTES,
869        };
870        let view = match store
871            .message_view(message_id, params)
872            .await
873            .map_err(map_storage)?
874        {
875            GetLookup::NotFound => {
876                return Err(map_error(crate::Error::not_found(
877                    "message",
878                    serde_json::json!(message_id),
879                    format!("message not found: {message_id}"),
880                )));
881            }
882            GetLookup::UnknownAfterId => return Err(unknown_after_id(request, "part id")),
883            GetLookup::Found(view) => view,
884        };
885        // The target's body rides `target_parts` (paginated, full); carrying
886        // `text`/`content` on the header too would just duplicate it.
887        let target = MessageView {
888            id: view.target.id,
889            role: view.target.role,
890            timestamp: view.target.timestamp,
891            text: None,
892            content: None,
893            parts_summary: Vec::new(),
894            parts: None,
895        };
896        Ok(GetResponse {
897            session: GetSession::from_session(&view.session),
898            result: GetResult::Message {
899                target,
900                target_parts: view
901                    .target_parts
902                    .into_iter()
903                    .map(ResponsePart::from_part)
904                    .collect(),
905                target_parts_remaining: view.target_parts_remaining,
906                siblings: view
907                    .siblings
908                    .into_iter()
909                    .map(|sibling| to_message_view(sibling, false))
910                    .collect(),
911            },
912        })
913    }
914}
915
916pub use get_handler::pond_get;
917
918mod search_handler {
919    //! The `pond_search` handler: hybrid (vector + BM25) retrieval at message
920    //! granularity, with filter pushdown and session-grouped responses
921    //! (spec.md#search).
922
923    use crate::{
924        Clock, SystemClock,
925        embed::{Embedder, LazyEmbedder, format_query},
926        sessions::{MessageKey, MessageMeta, Store},
927        substrate::{Predicate, ScalarValue},
928        wire::{
929            ErrorEnvelope, PartSummary, ProjectFilter, Role, SearchEnvelope, SearchFilters,
930            SearchRequest, SearchResponse, SearchResult, SearchSession, validate_protocol,
931        },
932    };
933    use chrono::NaiveDate;
934    use std::collections::HashMap;
935
936    use super::{map_error, map_storage};
937
938    /// Internal branching enum for the retrieval mode. Production callers
939    /// never pick: the server decides hybrid-vs-FTS from embedder availability.
940    /// `Vector` exists for operator tooling - selected via `pond search --mode`
941    /// or the `mode_override` wire field.
942    #[derive(Debug, Clone, Copy, PartialEq, Eq)]
943    pub enum SearchMode {
944        Hybrid,
945        Fts,
946        Vector,
947    }
948
949    #[derive(Debug, Clone, PartialEq)]
950    pub struct SearchPlan {
951        pub mode: SearchMode,
952        pub query: String,
953        pub filter: Predicate,
954        pub filters: SearchFilters,
955        pub pool: usize,
956        pub vector_pool: usize,
957        pub limit: usize,
958        pub min_score: f64,
959    }
960
961    const LIMIT_CAP: usize = 200;
962    const MAX_MATCHES_PER_SESSION: usize = 3;
963    const SEARCH_BUDGET_BYTES: usize = 60_000;
964    /// Centered query-windowed body returned on every hit (spec.md#search).
965    /// Calibrated for the agent-context budget: ~600 code points fits a typical
966    /// match site without crowding the 10k-token `pond_get` page.
967    const HIT_SNIPPET_CHARS: usize = 600;
968    const SCORE_DENOMINATOR: f64 = FTS_FUSION_WEIGHT + VECTOR_FUSION_WEIGHT;
969
970    // Score-normalized hybrid fusion weights (spec.md#search). Per-arm
971    // base_score (raw BM25 / raw cosine similarity) is min-max normalized
972    // within the arm's pool, then the two arms are combined as
973    // w_fts * norm_fts + w_vec * norm_vec. The 0.3:1 ratio comes from the
974    // 2026-06-10 re-sweep (ops/search-benchmarks/bench.py) after the
975    // vector arm switched from rank-norm to raw cosine: w_fts 0.2-0.3 ties
976    // the old 0.135 rank-norm config on the 111-query paraphrase set
977    // (S@3 67/111, sign test p=0.727) and beats it on the 12-query
978    // false-negative regression set (S@3 10/12 vs 9/12). Symmetric across
979    // arms - no per-query routing; cross-lingual queries are an agent-layer
980    // concern (see the `pond_search` MCP description).
981    const FTS_FUSION_WEIGHT: f64 = 0.3;
982    const VECTOR_FUSION_WEIGHT: f64 = 1.0;
983
984    /// Run a hybrid or FTS-only search. The mode is server-determined - hybrid
985    /// when the store has any vectors, FTS-only otherwise. The embedder is
986    /// `LazyEmbedder`-loaded on the first hybrid/vector call, so FTS-only
987    /// corpora never pay the model load. The response has no top-level mode
988    /// field; retriever attribution stays in `explain_search_plan`.
989    ///
990    /// Must run on a multi-threaded Tokio runtime: hybrid mode embeds the query via
991    /// `block_in_place`, which panics on a `current_thread` runtime.
992    pub async fn pond_search(
993        store: &Store,
994        embedder: &LazyEmbedder,
995        request: SearchRequest,
996        search: &crate::config::SearchConfig,
997    ) -> SearchEnvelope {
998        match run_search(store, embedder, request, search, &SystemClock).await {
999            Ok(response) => SearchEnvelope::Success(response),
1000            Err(envelope) => SearchEnvelope::Error(envelope),
1001        }
1002    }
1003
1004    pub async fn explain_search_plan(
1005        store: &Store,
1006        embedder: &LazyEmbedder,
1007        request: SearchRequest,
1008        search: &crate::config::SearchConfig,
1009    ) -> Result<String, ErrorEnvelope> {
1010        let override_mode = request.mode_override.map(wire_mode_to_internal);
1011        let mut plan = plan_search(request, SearchMode::Fts)?;
1012        plan.mode = resolve_effective_mode(store, override_mode).await?;
1013        let mut out = String::new();
1014        if matches!(plan.mode, SearchMode::Fts | SearchMode::Hybrid) {
1015            let fts = store
1016                .explain_fts_plan(&plan.query, plan.pool, &plan.filter)
1017                .await
1018                .map_err(map_storage)?;
1019            out.push_str("fts:\n");
1020            out.push_str(&fts);
1021            out.push('\n');
1022        }
1023        if matches!(plan.mode, SearchMode::Vector | SearchMode::Hybrid) {
1024            let backend = load_embedder(embedder).await?;
1025            let vector = embed_query(backend.as_ref(), &plan.query)?;
1026            let vector_plan = store
1027                .explain_vector_plan(&vector, plan.vector_pool, &plan.filter, Some(search))
1028                .await
1029                .map_err(map_storage)?;
1030            out.push_str("vector:\n");
1031            out.push_str(&vector_plan);
1032            out.push('\n');
1033        }
1034        Ok(out)
1035    }
1036
1037    async fn run_search(
1038        store: &Store,
1039        embedder: &LazyEmbedder,
1040        request: SearchRequest,
1041        search: &crate::config::SearchConfig,
1042        _clock: &dyn Clock,
1043    ) -> Result<SearchResponse, ErrorEnvelope> {
1044        let override_mode = request.mode_override.map(wire_mode_to_internal);
1045        let mut plan = plan_search(request, SearchMode::Fts)?;
1046
1047        // Mode is server-determined unless the caller passed an explicit
1048        // override (operator tooling). `has_embeddings()` is the only
1049        // gate: hybrid when the store has any vectors, FTS-only when empty.
1050        plan.mode = resolve_effective_mode(store, override_mode).await?;
1051
1052        // A session_id filter pins one conversation, so root-keyed fusion
1053        // would collapse the whole response to a single hit. Key fusion by
1054        // message there and let the one session carry up to `limit` hits.
1055        let single_session = plan.filters.session_id.is_some();
1056        let fusion_key = if single_session {
1057            FusionKey::Message
1058        } else {
1059            FusionKey::SessionRoot
1060        };
1061
1062        // The scope count (spec.md#search-absence-honesty: how many searchable
1063        // messages the filters left in scope, so "no relevant hits" is
1064        // distinguishable from "my filters excluded everything") overlaps
1065        // retrieval instead of preceding it - serialized, its count_rows
1066        // round-trip would be pure added latency on every search, and round
1067        // trips are what object-store backends pay for.
1068        let candidates_fut = async {
1069            match plan.mode {
1070                SearchMode::Fts => {
1071                    let hits = store
1072                        .fts_search(&plan.query, plan.pool, &plan.filter)
1073                        .await
1074                        .map_err(map_storage)?;
1075                    Ok(normalize_fts(hits))
1076                }
1077                SearchMode::Hybrid => {
1078                    let backend = load_embedder(embedder).await?;
1079                    let vector = embed_query(backend.as_ref(), &plan.query)?;
1080                    // The two retrievers hit disjoint datasets (and disjoint mutexes),
1081                    // so run them concurrently rather than back-to-back.
1082                    let fts_fut = async {
1083                        store
1084                            .fts_search(&plan.query, plan.pool, &plan.filter)
1085                            .await
1086                            .map_err(map_storage)
1087                    };
1088                    let vector_fut = async {
1089                        store
1090                            .vector_search(&vector, plan.vector_pool, &plan.filter, Some(search))
1091                            .await
1092                            .map_err(map_storage)
1093                    };
1094                    let (fts, vector_raw) = tokio::try_join!(fts_fut, vector_fut)?;
1095                    // Per-arm scores carry raw magnitude into fusion: BM25 for
1096                    // FTS, cosine similarity (`1 - distance`) for the vector arm.
1097                    // Fusion (`fuse_arms`) min-max normalizes each arm over its
1098                    // full pool and weights the arms by FTS_FUSION_WEIGHT and
1099                    // VECTOR_FUSION_WEIGHT. Magnitude (not rank position) is the
1100                    // load-bearing signal: rank-norm made a hit's score depend on
1101                    // pool size - and pools scale with `limit` - while raw cosine
1102                    // is stable across pool sizes (2026-06-10 benchmark).
1103                    let fts_entries: Vec<(MessageKey, f64)> = fts
1104                        .into_iter()
1105                        .map(|(key, score)| (key, f64::from(score)))
1106                        .collect();
1107                    let vector_entries: Vec<(MessageKey, f64)> = vector_raw
1108                        .into_iter()
1109                        .map(|(key, distance)| (key, 1.0 - f64::from(distance)))
1110                        .collect();
1111                    let lists = [
1112                        RankedList {
1113                            retriever: RetrieverKind::Fts,
1114                            entries: fts_entries,
1115                            weight: FTS_FUSION_WEIGHT,
1116                        },
1117                        RankedList {
1118                            retriever: RetrieverKind::Vector,
1119                            entries: vector_entries,
1120                            weight: VECTOR_FUSION_WEIGHT,
1121                        },
1122                    ];
1123                    Ok(fuse_arms(&lists, fusion_key)
1124                        .into_iter()
1125                        .map(|hit| Candidate {
1126                            session_id: hit.key.session_id,
1127                            message_id: hit.key.message_id,
1128                            base_score: hit.score,
1129                        })
1130                        .collect())
1131                }
1132                // Vector-only branch. Reached when the caller set
1133                // `mode_override = Vector` (operator tooling / benchmark
1134                // harness): embed `plan.query` and run kNN.
1135                SearchMode::Vector => {
1136                    let backend = load_embedder(embedder).await?;
1137                    let vector = embed_query(backend.as_ref(), &plan.query)?;
1138                    let vector_raw = store
1139                        .vector_search(&vector, plan.vector_pool, &plan.filter, Some(search))
1140                        .await
1141                        .map_err(map_storage)?;
1142                    Ok(normalize_vector(vector_raw))
1143                }
1144            }
1145        };
1146        let scope_fut = async {
1147            store
1148                .searchable_in_scope(&plan.filter)
1149                .await
1150                .map_err(map_storage)
1151        };
1152        let (candidates, searchable_in_scope) = tokio::try_join!(candidates_fut, scope_fut)?;
1153
1154        if candidates.is_empty() {
1155            return Ok(empty_response(searchable_in_scope));
1156        }
1157
1158        // Hydrate hit metadata (timestamp, role, project, preview source) from
1159        // the `messages` table - the retrievers return only message keys.
1160        let keys = candidates
1161            .iter()
1162            .map(|candidate| MessageKey {
1163                session_id: candidate.session_id.clone(),
1164                message_id: candidate.message_id.clone(),
1165            })
1166            .collect::<Vec<_>>();
1167        let metas = store
1168            .message_metas_by_keys(&keys)
1169            .await
1170            .map_err(map_storage)?;
1171        let meta_index = metas
1172            .iter()
1173            .map(|meta| ((meta.session_id.as_str(), meta.message_id.as_str()), meta))
1174            .collect::<std::collections::HashMap<_, _>>();
1175
1176        let mut scored = Vec::with_capacity(candidates.len());
1177        for candidate in candidates {
1178            let Some(meta) =
1179                meta_index.get(&(candidate.session_id.as_str(), candidate.message_id.as_str()))
1180            else {
1181                continue;
1182            };
1183            let score = candidate.base_score;
1184            if score < plan.min_score {
1185                continue;
1186            }
1187            scored.push(ScoredHit {
1188                meta: (*meta).clone(),
1189                score,
1190            });
1191        }
1192        scored.sort_by(|left, right| {
1193            right
1194                .score
1195                .partial_cmp(&left.score)
1196                .unwrap_or(std::cmp::Ordering::Equal)
1197                .then_with(|| left.meta.session_id.cmp(&right.meta.session_id))
1198                .then_with(|| left.meta.message_id.cmp(&right.meta.message_id))
1199        });
1200
1201        let matched_total = scored.len();
1202        // Session-scoped searches return one session; the per-session match
1203        // cap would silently discard most of what the caller asked for, so
1204        // it widens to the requested limit there.
1205        let matches_cap = if single_session {
1206            plan.limit
1207        } else {
1208            MAX_MATCHES_PER_SESSION
1209        };
1210        let sessions = build_sessions(store, &scored, &plan.query, matches_cap).await?;
1211        page_sessions(sessions, matched_total, searchable_in_scope, &plan)
1212    }
1213
1214    /// Pick the retrieval mode. An explicit caller override (operator tooling
1215    /// via the wire `mode_override` field / `pond search --mode`) wins; in its
1216    /// absence the server runs hybrid when the store has any vectors and
1217    /// FTS-only when it doesn't (`has_embeddings()` is the only gate).
1218    async fn resolve_effective_mode(
1219        store: &Store,
1220        override_mode: Option<SearchMode>,
1221    ) -> Result<SearchMode, ErrorEnvelope> {
1222        if let Some(mode) = override_mode {
1223            return Ok(mode);
1224        }
1225        let has = store.has_embeddings().await.map_err(map_storage)?;
1226        Ok(if has {
1227            SearchMode::Hybrid
1228        } else {
1229            SearchMode::Fts
1230        })
1231    }
1232
1233    /// Materialize the lazy embedder on the first hybrid/vector branch that
1234    /// needs it. Wraps the load error in an Internal envelope - candle/Metal
1235    /// load failure is a server-side problem, not a caller error.
1236    async fn load_embedder(
1237        embedder: &LazyEmbedder,
1238    ) -> Result<std::sync::Arc<dyn Embedder>, ErrorEnvelope> {
1239        embedder.get().await.map_err(|error| {
1240            map_error(crate::Error::internal(format!(
1241                "embedder load failed: {error}"
1242            )))
1243        })
1244    }
1245
1246    pub fn plan_search(
1247        request: SearchRequest,
1248        mode: SearchMode,
1249    ) -> Result<SearchPlan, ErrorEnvelope> {
1250        validate_protocol(request.protocol_version)?;
1251
1252        let _ns = super::resolve_namespace(request.namespace.as_deref())?;
1253
1254        let filters = request.filters;
1255        let query = request.query.trim().to_owned();
1256        if query.is_empty() {
1257            return Err(map_error(crate::Error::validation_field(
1258                "query must be non-empty after trim",
1259                "query",
1260                Some(serde_json::json!(request.query)),
1261                Some("non-empty string after trim".to_owned()),
1262            )));
1263        }
1264        if request.limit == 0 {
1265            return Err(map_error(crate::Error::validation_field(
1266                "limit must be at least 1",
1267                "limit",
1268                Some(serde_json::json!(request.limit)),
1269                Some("integer >= 1".to_owned()),
1270            )));
1271        }
1272        let limit = request.limit.min(LIMIT_CAP);
1273        let min_score = filters.min_score;
1274        let filter = build_filter(&filters)?;
1275        // Retriever candidate pool: wider than `limit` so the fuser has
1276        // material to merge.
1277        let pool = limit.saturating_mul(5).max(50);
1278        Ok(SearchPlan {
1279            mode,
1280            query,
1281            filter,
1282            filters,
1283            pool,
1284            vector_pool: pool.saturating_mul(2),
1285            limit,
1286            min_score,
1287        })
1288    }
1289
1290    #[derive(Debug, Clone, Copy, PartialEq, Eq)]
1291    pub enum RetrieverKind {
1292        Vector,
1293        Fts,
1294    }
1295
1296    impl RetrieverKind {
1297        fn as_wire(self) -> &'static str {
1298            match self {
1299                Self::Vector => "vector",
1300                Self::Fts => "fts",
1301            }
1302        }
1303    }
1304
1305    /// A retriever-ranked arm: scored hits best-first plus the arm's fusion
1306    /// weight. `entries` carries each hit's raw `base_score` (BM25 for FTS,
1307    /// cosine-similarity for the vector arm); the fuser min-max normalizes
1308    /// those within the arm before combining across arms. `weight` is the
1309    /// per-arm scalar that controls relative arm influence after
1310    /// normalization.
1311    pub struct RankedList {
1312        pub retriever: RetrieverKind,
1313        pub entries: Vec<(MessageKey, f64)>,
1314        pub weight: f64,
1315    }
1316
1317    /// Wire-to-internal mode mapping. Kept here so the wire type stays free of
1318    /// handler-internal concerns and the conversion is one obvious place.
1319    fn wire_mode_to_internal(wire: crate::wire::SearchModeWire) -> SearchMode {
1320        match wire {
1321            crate::wire::SearchModeWire::Fts => SearchMode::Fts,
1322            crate::wire::SearchModeWire::Vector => SearchMode::Vector,
1323            crate::wire::SearchModeWire::Hybrid => SearchMode::Hybrid,
1324        }
1325    }
1326
1327    /// One merged hybrid-fusion result.
1328    #[derive(Debug, Clone, PartialEq)]
1329    pub struct FusedHit {
1330        pub key: MessageKey,
1331        pub score: f64,
1332        pub matched_via: Vec<String>,
1333    }
1334
1335    /// Conversation root for grouping and per-arm dedup. The Claude Code adapter
1336    /// stores sub-agent sessions under ids of the form `<parent-uuid>/agent-<id>`;
1337    /// stripping at the first `/` yields the user-facing conversation root. Other
1338    /// adapters (codex, etc.) use ids without `/` and pass through unchanged.
1339    fn session_root(session_id: &str) -> &str {
1340        match session_id.find('/') {
1341            Some(idx) => &session_id[..idx],
1342            None => session_id,
1343        }
1344    }
1345
1346    /// How fusion groups candidates. `SessionRoot` is the default: cross-arm
1347    /// agreement is credited at the conversation level, one fused hit per
1348    /// root. `Message` is for session-scoped searches (a `session_id`
1349    /// filter): every candidate shares the root there, so root keying would
1350    /// collapse the whole response to exactly one hit (spec.md#search).
1351    #[derive(Debug, Clone, Copy, PartialEq, Eq)]
1352    pub enum FusionKey {
1353        SessionRoot,
1354        Message,
1355    }
1356
1357    /// Score-normalized hybrid fusion: for each arm, the surviving (post
1358    /// intra-arm dedup-by-key) raw `base_score` values are min-max normalized
1359    /// to [0, 1] across that arm's pool, then summed across arms weighted by
1360    /// `RankedList.weight`. The representative message for a fused group is
1361    /// the one with the highest single weighted contribution - the message
1362    /// the strongest arm actually matched - so the displayed snippet reflects
1363    /// why the group ranked where it did. Ties break on the representative
1364    /// key for determinism (spec.md#search).
1365    ///
1366    /// Why session-root keying by default instead of `(session_id,
1367    /// message_id)`: a long session whose best FTS message and best vector
1368    /// message differ would otherwise appear as two separate fused hits,
1369    /// neither getting the cross-arm validation bonus. Keying on the root
1370    /// credits cross-arm agreement at the conversation level - which is what
1371    /// the user sees.
1372    ///
1373    /// Why per-arm score normalization instead of RRF: RRF discards score
1374    /// magnitude (rank 1 contributes the same whether the vector cosine is
1375    /// 0.85 or 0.55), and on paraphrase queries that magnitude is the load-
1376    /// bearing signal. See `ops/search-benchmarks/bench.py` and
1377    /// `docs/researches/embeddings.md`.
1378    pub fn fuse_arms(lists: &[RankedList], key_by: FusionKey) -> Vec<FusedHit> {
1379        struct Group {
1380            score: f64,
1381            matched_via: Vec<String>,
1382            rep: MessageKey,
1383            rep_contribution: f64,
1384        }
1385        let group_key = |key: &MessageKey| match key_by {
1386            FusionKey::SessionRoot => session_root(&key.session_id).to_owned(),
1387            FusionKey::Message => format!("{}\u{0}{}", key.session_id, key.message_id),
1388        };
1389        let mut merged: std::collections::HashMap<String, Group> = std::collections::HashMap::new();
1390        for list in lists {
1391            if list.entries.is_empty() {
1392                continue;
1393            }
1394            // Min-max normalize across the FULL arm pool BEFORE dedup. The
1395            // benchmark replay (ops/search-benchmarks/) and the
1396            // production code MUST agree on the normalization basis;
1397            // dedupping first would narrow [lo, hi] to only the surviving
1398            // groups' scores and skew the normalized signal away from what
1399            // the benchmark reports. A degenerate arm where every hit ties
1400            // on raw score collapses to zero contribution; the other arm
1401            // then decides the order.
1402            let mut lo = f64::INFINITY;
1403            let mut hi = f64::NEG_INFINITY;
1404            for (_, raw) in &list.entries {
1405                if *raw < lo {
1406                    lo = *raw;
1407                }
1408                if *raw > hi {
1409                    hi = *raw;
1410                }
1411            }
1412            let range = hi - lo;
1413            // Intra-arm dedup keeps the highest-scoring message each arm
1414            // returned for a given group: without it a long session whose
1415            // top-N hits all share a root would crowd out cross-arm signal
1416            // from other sessions. (Under Message keying every entry is its
1417            // own group, so this is a no-op there.)
1418            let mut seen_in_arm: std::collections::HashSet<String> =
1419                std::collections::HashSet::new();
1420            for (key, raw) in &list.entries {
1421                let group = group_key(key);
1422                if !seen_in_arm.insert(group.clone()) {
1423                    continue;
1424                }
1425                let norm = if range > 0.0 { (raw - lo) / range } else { 0.0 };
1426                let contribution = list.weight * norm;
1427                let entry = merged.entry(group).or_insert_with(|| Group {
1428                    score: 0.0,
1429                    matched_via: Vec::new(),
1430                    rep: key.clone(),
1431                    rep_contribution: f64::NEG_INFINITY,
1432                });
1433                entry.score += contribution;
1434                entry.matched_via.push(list.retriever.as_wire().to_owned());
1435                if contribution > entry.rep_contribution {
1436                    entry.rep = key.clone();
1437                    entry.rep_contribution = contribution;
1438                }
1439            }
1440        }
1441        let mut hits = merged
1442            .into_values()
1443            .map(|group| FusedHit {
1444                key: group.rep,
1445                score: group.score,
1446                matched_via: group.matched_via,
1447            })
1448            .collect::<Vec<_>>();
1449        hits.sort_by(|left, right| {
1450            right
1451                .score
1452                .partial_cmp(&left.score)
1453                .unwrap_or(std::cmp::Ordering::Equal)
1454                .then_with(|| left.key.cmp(&right.key))
1455        });
1456        hits
1457    }
1458
1459    /// Minimum query-term length considered "informative" for snippet
1460    /// anchoring. Shorter terms ("how", "the", "is", "my", "at") attract the
1461    /// `.min()` anchor to offset-near-0 because they occur very early in any
1462    /// text, masking the real match site.
1463    const ANCHOR_MIN_TERM_CHARS: usize = 4;
1464
1465    /// Build a hit's `text` payload (spec.md#search): the message body when
1466    /// it fits within the snippet window, otherwise a query-windowed slice
1467    /// centered on the first informative term. Bounded for the agent-context
1468    /// budget; callers fetch the full body via `pond_get`.
1469    pub fn hit_payload(text: &str, query: &str) -> String {
1470        let chars_len = text.chars().count();
1471        if chars_len <= HIT_SNIPPET_CHARS {
1472            return text.to_owned();
1473        }
1474        query_snippet(text, query)
1475    }
1476
1477    /// A snippet windowed around the first informative query term found in
1478    /// `text`, capped at [`HIT_SNIPPET_CHARS`] code points. Falls back to the
1479    /// text head when no term matches.
1480    ///
1481    /// Terms shorter than [`ANCHOR_MIN_TERM_CHARS`] are excluded from anchor
1482    /// selection because they pull the window to offset-0 (a snippet audit on
1483    /// the live corpus found ~25-30% of conversational queries had their
1484    /// anchor degraded by short stop-word-like terms like "how", "the", "my").
1485    /// If every term is short, the filter is bypassed.
1486    ///
1487    /// TODO(snippet-anchor): reassess for vector-only hits (paraphrase queries
1488    /// where no literal term matches): the fallback to offset-0 is OK but not
1489    /// great. Possible upgrades: ngram match overlap, or
1490    /// skip-window-around-most-distinctive-substring. See snippet audit in
1491    /// tier-0 findings.
1492    fn query_snippet(text: &str, query: &str) -> String {
1493        let lower_text = text.to_lowercase();
1494        let terms: Vec<String> = query
1495            .split_whitespace()
1496            .filter(|term| !term.is_empty())
1497            .map(str::to_lowercase)
1498            .collect();
1499        let any_informative = terms
1500            .iter()
1501            .any(|term| term.chars().count() >= ANCHOR_MIN_TERM_CHARS);
1502        let hit = terms
1503            .iter()
1504            .filter(|term| !any_informative || term.chars().count() >= ANCHOR_MIN_TERM_CHARS)
1505            .filter_map(|term| lower_text.find(term.as_str()))
1506            .min();
1507        let chars: Vec<char> = text.chars().collect();
1508        // `find` returned a byte offset into the lowercased copy; index that
1509        // copy, not `text` - lowercasing can change byte length, so the offset
1510        // is not necessarily a valid char boundary in the original.
1511        let center = hit
1512            .map(|byte| lower_text[..byte].chars().count())
1513            .unwrap_or(0);
1514        let half = HIT_SNIPPET_CHARS / 2;
1515        let start = center.saturating_sub(half);
1516        let end = (start + HIT_SNIPPET_CHARS).min(chars.len());
1517        let start = end.saturating_sub(HIT_SNIPPET_CHARS);
1518        // Truncation markers carry the omitted-char counts so the agent knows
1519        // this is a windowed slice and roughly how much it's missing; the hit's
1520        // `message_id` is the handle to fetch the rest via `pond_get`.
1521        let mut snippet = String::new();
1522        if start > 0 {
1523            snippet.push_str(&format!("[{start} chars before] "));
1524        }
1525        snippet.extend(&chars[start..end]);
1526        if end < chars.len() {
1527            snippet.push_str(&format!(
1528                " [+{} more chars; pond_get for full]",
1529                chars.len() - end
1530            ));
1531        }
1532        snippet
1533    }
1534
1535    struct Candidate {
1536        session_id: String,
1537        message_id: String,
1538        base_score: f64,
1539    }
1540
1541    struct ScoredHit {
1542        meta: MessageMeta,
1543        score: f64,
1544    }
1545
1546    impl ScoredHit {
1547        fn to_search_result(
1548            &self,
1549            query: &str,
1550            summaries: &HashMap<(String, String), Vec<PartSummary>>,
1551        ) -> Result<SearchResult, ErrorEnvelope> {
1552            let text = hit_payload(&self.meta.search_text, query);
1553            let role = match self.meta.role.as_str() {
1554                "system" => Role::System,
1555                "user" => Role::User,
1556                "assistant" => Role::Assistant,
1557                "tool" => Role::Tool,
1558                other => {
1559                    return Err(map_error(crate::Error::internal(format!(
1560                        "stored message has unknown role: {other}"
1561                    ))));
1562                }
1563            };
1564            // Only user hits earn a parts_summary (FilePart signal); see the
1565            // rationale in spec.md#search.
1566            let parts_summary = if matches!(role, Role::User) {
1567                summaries
1568                    .get(&(self.meta.session_id.clone(), self.meta.message_id.clone()))
1569                    .cloned()
1570                    .unwrap_or_default()
1571            } else {
1572                Vec::new()
1573            };
1574            Ok(SearchResult {
1575                message_id: self.meta.message_id.clone(),
1576                role,
1577                timestamp: self.meta.timestamp,
1578                text,
1579                score: normalize_score(self.score),
1580                parts_summary,
1581            })
1582        }
1583    }
1584
1585    fn normalize_score(score: f64) -> f64 {
1586        (score / SCORE_DENOMINATOR).clamp(0.0, 1.0)
1587    }
1588
1589    fn normalize_fts(hits: Vec<(MessageKey, f32)>) -> Vec<Candidate> {
1590        let max = hits.iter().map(|(_, score)| *score).fold(0.0_f32, f32::max);
1591        hits.into_iter()
1592            .map(|(key, score)| Candidate {
1593                session_id: key.session_id,
1594                message_id: key.message_id,
1595                base_score: if max > 0.0 {
1596                    f64::from(score / max)
1597                } else {
1598                    0.0
1599                },
1600            })
1601            .collect()
1602    }
1603
1604    // Cosine similarity (`1 - distance`), matching the hybrid arm: the score
1605    // carries magnitude and is stable across pool sizes, instead of the old
1606    // rank-norm `1 - idx/n` whose value shifted whenever `limit` changed.
1607    fn normalize_vector(hits: Vec<(MessageKey, f32)>) -> Vec<Candidate> {
1608        hits.into_iter()
1609            .map(|(key, distance)| Candidate {
1610                session_id: key.session_id,
1611                message_id: key.message_id,
1612                base_score: 1.0 - f64::from(distance),
1613            })
1614            .collect()
1615    }
1616
1617    fn embed_query(embedder: &dyn Embedder, query: &str) -> Result<Vec<f32>, ErrorEnvelope> {
1618        let prompt = format_query(query);
1619        // Model inference is synchronous and CPU-bound; `block_in_place` keeps
1620        // it from stalling other tasks on the async worker thread. (Requires a
1621        // multi-threaded runtime - see `pond_search`.)
1622        let vectors =
1623            tokio::task::block_in_place(|| embedder.embed(&[prompt])).map_err(|error_value| {
1624                map_error(crate::Error::internal(format!(
1625                    "failed to embed query: {error_value}"
1626                )))
1627            })?;
1628        vectors.into_iter().next().ok_or_else(|| {
1629            map_error(crate::Error::internal(
1630                "embedder returned no vector for query",
1631            ))
1632        })
1633    }
1634
1635    async fn build_sessions(
1636        store: &Store,
1637        scored: &[ScoredHit],
1638        query: &str,
1639        matches_cap: usize,
1640    ) -> Result<Vec<SearchSession>, ErrorEnvelope> {
1641        use std::collections::BTreeMap;
1642
1643        struct Acc {
1644            project: String,
1645            source_agent: String,
1646            matched_count: usize,
1647            matches: Vec<SearchResult>,
1648        }
1649        // Precompute part summaries for user-role hits, grouped by their actual
1650        // session id (a subagent hit's parts live under `root/agent-...`, not
1651        // the grouping root).
1652        let mut user_ids_by_session: BTreeMap<String, Vec<String>> = BTreeMap::new();
1653        for hit in scored {
1654            if hit.meta.role == "user" {
1655                user_ids_by_session
1656                    .entry(hit.meta.session_id.clone())
1657                    .or_default()
1658                    .push(hit.meta.message_id.clone());
1659            }
1660        }
1661        let mut summaries: HashMap<(String, String), Vec<PartSummary>> = HashMap::new();
1662        for (session_id, message_ids) in &user_ids_by_session {
1663            for (key, parts) in store
1664                .summary_parts_for_messages(session_id, message_ids)
1665                .await
1666                .map_err(map_storage)?
1667            {
1668                summaries.insert(
1669                    key,
1670                    parts
1671                        .iter()
1672                        .filter_map(|part| PartSummary::for_kind(&part.kind))
1673                        .collect(),
1674                );
1675            }
1676        }
1677
1678        let mut groups: BTreeMap<String, Acc> = BTreeMap::new();
1679        for hit in scored {
1680            let root = session_root(&hit.meta.session_id).to_owned();
1681            let entry = groups.entry(root).or_insert_with(|| Acc {
1682                project: hit.meta.project.clone(),
1683                source_agent: hit.meta.source_agent.clone(),
1684                matched_count: 0,
1685                matches: Vec::new(),
1686            });
1687            entry.matched_count += 1;
1688            entry.matches.push(hit.to_search_result(query, &summaries)?);
1689        }
1690
1691        let session_ids = groups.keys().cloned().collect::<Vec<_>>();
1692        let counts = store
1693            .session_message_counts(&session_ids)
1694            .await
1695            .map_err(map_storage)?;
1696
1697        let mut result = groups
1698            .into_iter()
1699            .map(|(session_id, mut acc)| {
1700                acc.matches.sort_by(|left, right| {
1701                    right
1702                        .score
1703                        .partial_cmp(&left.score)
1704                        .unwrap_or(std::cmp::Ordering::Equal)
1705                        .then_with(|| left.message_id.cmp(&right.message_id))
1706                });
1707                acc.matches.truncate(matches_cap);
1708                SearchSession {
1709                    session_messages_count: counts.get(&session_id).copied().unwrap_or_default(),
1710                    session_id,
1711                    project: acc.project,
1712                    source_agent: acc.source_agent,
1713                    matched_message_count: acc.matched_count,
1714                    matches: acc.matches,
1715                }
1716            })
1717            .collect::<Vec<_>>();
1718        result.sort_by(|left, right| {
1719            let left_score = left
1720                .matches
1721                .first()
1722                .map(|hit| hit.score)
1723                .unwrap_or_default();
1724            let right_score = right
1725                .matches
1726                .first()
1727                .map(|hit| hit.score)
1728                .unwrap_or_default();
1729            right_score
1730                .partial_cmp(&left_score)
1731                .unwrap_or(std::cmp::Ordering::Equal)
1732                .then_with(|| left.session_id.cmp(&right.session_id))
1733        });
1734        Ok(result)
1735    }
1736
1737    fn page_sessions(
1738        sessions: Vec<SearchSession>,
1739        matched_total: usize,
1740        searchable_in_scope: usize,
1741        plan: &SearchPlan,
1742    ) -> Result<SearchResponse, ErrorEnvelope> {
1743        let mut emitted = Vec::new();
1744        let mut used_bytes = 0usize;
1745        for session in sessions.iter() {
1746            if emitted.len() >= plan.limit {
1747                break;
1748            }
1749            let bytes = serde_json::to_string(session)
1750                .map_err(|error| {
1751                    map_error(crate::Error::internal(format!(
1752                        "failed to size search response session: {error}"
1753                    )))
1754                })?
1755                .len();
1756            if !emitted.is_empty() && used_bytes.saturating_add(bytes) > SEARCH_BUDGET_BYTES {
1757                break;
1758            }
1759            used_bytes = used_bytes.saturating_add(bytes);
1760            emitted.push(session.clone());
1761        }
1762
1763        // `has_more` warns the caller that `limit` or the byte budget cut the
1764        // ranked set short - raise `limit` (up to the cap) to see the rest.
1765        // There is no pagination cursor: the result set is relevance-ranked and
1766        // capped, so a wider `limit` dominates page-walking (spec.md#search).
1767        let has_more = emitted.len() < sessions.len();
1768
1769        Ok(SearchResponse {
1770            sessions: emitted,
1771            matched_total,
1772            searchable_in_scope,
1773            has_more,
1774        })
1775    }
1776
1777    /// Build the shared scalar filter predicate pushed into both retrievers.
1778    /// Both the FTS and vector retrievers scan `messages` (spec.md#datasets),
1779    /// so one predicate serves both.
1780    pub fn build_filter(filters: &SearchFilters) -> Result<Predicate, ErrorEnvelope> {
1781        let mut clauses = Vec::new();
1782
1783        match &filters.project {
1784            None => {}
1785            Some(ProjectFilter::Contains(value)) => {
1786                clauses.push(Predicate::LikeContains("project", value.clone()));
1787            }
1788            Some(ProjectFilter::Regex(pattern)) => {
1789                clauses.push(Predicate::Regex("project", pattern.clone()));
1790            }
1791        }
1792
1793        if let Some(session_id) = &filters.session_id {
1794            clauses.push(Predicate::Eq("session_id", session_id.clone().into()));
1795        }
1796        if let Some(source_agent) = &filters.source_agent {
1797            clauses.push(Predicate::Eq("source_agent", source_agent.clone().into()));
1798        }
1799        if let Some(from_date) = &filters.from_date {
1800            clauses.push(Predicate::Gte(
1801                "timestamp",
1802                ScalarValue::Raw(date_bound(from_date, "filters.from_date", false)?),
1803            ));
1804        }
1805        if let Some(to_date) = &filters.to_date {
1806            clauses.push(Predicate::Lte(
1807                "timestamp",
1808                ScalarValue::Raw(date_bound(to_date, "filters.to_date", true)?),
1809            ));
1810        }
1811
1812        // spec.md#search: subagent sessions (`source_agent` with a "/") are
1813        // excluded by default; an explicit session_id/source_agent filter means
1814        // the caller is already scoping, so the exclusion would only fight it.
1815        if !filters.include_subagents
1816            && filters.session_id.is_none()
1817            && filters.source_agent.is_none()
1818        {
1819            clauses.push(Predicate::Not(Box::new(Predicate::LikeContains(
1820                "source_agent",
1821                "/".to_owned(),
1822            ))));
1823        }
1824
1825        Ok(Predicate::And(clauses))
1826    }
1827
1828    /// Parse a `YYYY-MM-DD` filter date into a timestamp literal. `end_of_day`
1829    /// pushes `to_date` to the inclusive end of the day.
1830    fn date_bound(date: &str, field: &str, end_of_day: bool) -> Result<String, ErrorEnvelope> {
1831        NaiveDate::parse_from_str(date, "%Y-%m-%d").map_err(|_| {
1832            map_error(crate::Error::validation_field(
1833                format!("{field} must be in YYYY-MM-DD format; got {date}"),
1834                field,
1835                Some(serde_json::json!(date)),
1836                Some("YYYY-MM-DD".to_owned()),
1837            ))
1838        })?;
1839        let time = if end_of_day { "23:59:59" } else { "00:00:00" };
1840        Ok(format!("timestamp '{date} {time}'"))
1841    }
1842
1843    fn empty_response(searchable_in_scope: usize) -> SearchResponse {
1844        SearchResponse {
1845            sessions: Vec::new(),
1846            matched_total: 0,
1847            searchable_in_scope,
1848            has_more: false,
1849        }
1850    }
1851
1852    #[cfg(test)]
1853    mod fusion_helpers_tests {
1854        #![allow(clippy::expect_used, clippy::unwrap_used)]
1855
1856        use super::*;
1857
1858        #[test]
1859        fn session_root_strips_agent_suffix_for_claude_code_subagents() {
1860            assert_eq!(
1861                session_root("94a50f23-1234-5678-9abc-def012345678"),
1862                "94a50f23-1234-5678-9abc-def012345678",
1863            );
1864            assert_eq!(
1865                session_root("94a50f23-1234-5678-9abc-def012345678/agent-abc123"),
1866                "94a50f23-1234-5678-9abc-def012345678",
1867            );
1868            // Multiple slashes: still cut at the first one (defensive).
1869            assert_eq!(session_root("root/a/b"), "root");
1870        }
1871
1872        #[test]
1873        fn fuse_arms_dedupes_intra_arm_by_session_root_and_credits_cross_arm() {
1874            let mk = |sid: &str, mid: &str| crate::sessions::MessageKey {
1875                session_id: sid.to_owned(),
1876                message_id: mid.to_owned(),
1877            };
1878            // FTS pool (raw BM25): session-A msg-1 (10.0), session-A msg-2
1879            // (9.0, same root, dropped by intra-arm dedup), session-B msg-3
1880            // (6.0), session-A/agent-x msg-4 (5.0, same root as A, dropped).
1881            // Vector pool (raw cosine): session-B msg-7 (0.9, different
1882            // message than FTS's pick for B), session-A msg-9 (0.6).
1883            let fts = RankedList {
1884                retriever: RetrieverKind::Fts,
1885                entries: vec![
1886                    (mk("session-A", "msg-1"), 10.0),
1887                    (mk("session-A", "msg-2"), 9.0),
1888                    (mk("session-B", "msg-3"), 6.0),
1889                    (mk("session-A/agent-x", "msg-4"), 5.0),
1890                ],
1891                weight: 0.135,
1892            };
1893            let vec_arm = RankedList {
1894                retriever: RetrieverKind::Vector,
1895                entries: vec![
1896                    (mk("session-B", "msg-7"), 0.9),
1897                    (mk("session-A", "msg-9"), 0.6),
1898                ],
1899                weight: 1.0,
1900            };
1901            let merged = fuse_arms(&[fts, vec_arm], FusionKey::SessionRoot);
1902            // Output: one row per session_root after intra-arm dedup.
1903            assert_eq!(merged.len(), 2);
1904            // Per-arm min-max over the FULL pool BEFORE dedup:
1905            // FTS pool [10, 9, 6, 5]: range 5. A's first hit (10) -> 1.0;
1906            // B's first hit (6) -> 0.2.
1907            // Vector pool [0.9, 0.6]: range 0.3. B -> 1.0; A -> 0.0.
1908            // session-A: 0.135 * 1.0 + 1.0 * 0.0 = 0.135.
1909            // session-B: 0.135 * 0.2 + 1.0 * 1.0 = 1.027. B wins.
1910            assert_eq!(merged[0].key.session_id, "session-B");
1911            // Representative = highest single weighted contribution: Vector's
1912            // msg-7 (1.0 * 1.0) beats FTS's msg-3 (0.135 * 0.2) for B.
1913            assert_eq!(merged[0].key.message_id, "msg-7");
1914            assert_eq!(merged[0].matched_via, vec!["fts", "vector"]);
1915            assert_eq!(merged[1].key.session_id, "session-A");
1916            // For A the FTS contribution (0.135 * 1.0) beats Vector's
1917            // (1.0 * 0.0), so FTS's msg-1 is the representative.
1918            assert_eq!(merged[1].key.message_id, "msg-1");
1919            assert_eq!(merged[1].matched_via, vec!["fts", "vector"]);
1920        }
1921
1922        #[test]
1923        fn fuse_arms_message_key_keeps_per_message_hits_within_one_session() {
1924            // Session-scoped searches (session_id filter) fuse per message:
1925            // root keying would collapse everything below to one hit.
1926            let mk = |sid: &str, mid: &str| crate::sessions::MessageKey {
1927                session_id: sid.to_owned(),
1928                message_id: mid.to_owned(),
1929            };
1930            let fts = RankedList {
1931                retriever: RetrieverKind::Fts,
1932                entries: vec![
1933                    (mk("session-A", "msg-1"), 10.0),
1934                    (mk("session-A", "msg-2"), 6.0),
1935                ],
1936                weight: 0.3,
1937            };
1938            let vec_arm = RankedList {
1939                retriever: RetrieverKind::Vector,
1940                entries: vec![
1941                    (mk("session-A", "msg-2"), 0.9),
1942                    (mk("session-A", "msg-3"), 0.6),
1943                ],
1944                weight: 1.0,
1945            };
1946            let merged = fuse_arms(&[fts, vec_arm], FusionKey::Message);
1947            assert_eq!(merged.len(), 3, "one fused hit per message, not per root");
1948            // FTS pool [10, 6]: msg-1 -> 1.0, msg-2 -> 0.0. Vector pool
1949            // [0.9, 0.6]: msg-2 -> 1.0, msg-3 -> 0.0. So msg-2 = 1.0 (both
1950            // arms), msg-1 = 0.3 (FTS only), msg-3 = 0.0 (vector only).
1951            assert_eq!(merged[0].key.message_id, "msg-2");
1952            assert_eq!(merged[0].matched_via, vec!["fts", "vector"]);
1953            assert_eq!(merged[1].key.message_id, "msg-1");
1954            assert_eq!(merged[1].matched_via, vec!["fts"]);
1955        }
1956
1957        #[test]
1958        fn fuse_arms_collapses_degenerate_tied_arm_to_zero_contribution() {
1959            // When every surviving hit in an arm shares the same raw score,
1960            // min-max normalization has zero range; that arm contributes 0
1961            // and the other arm decides the order on its own normalized
1962            // signal. This protects fusion from "flat" arms (e.g. an FTS arm
1963            // whose BM25 scores all tie at the same low magnitude).
1964            let mk = |sid: &str, mid: &str| crate::sessions::MessageKey {
1965                session_id: sid.to_owned(),
1966                message_id: mid.to_owned(),
1967            };
1968            let fts = RankedList {
1969                retriever: RetrieverKind::Fts,
1970                entries: vec![(mk("session-A", "a"), 1.0), (mk("session-B", "b"), 1.0)],
1971                weight: 0.135,
1972            };
1973            let vec_arm = RankedList {
1974                retriever: RetrieverKind::Vector,
1975                entries: vec![(mk("session-A", "a"), 0.9), (mk("session-B", "b"), 0.3)],
1976                weight: 1.0,
1977            };
1978            let merged = fuse_arms(&[fts, vec_arm], FusionKey::SessionRoot);
1979            // Vector arm alone decides: A's normalized 1.0 beats B's 0.0.
1980            assert_eq!(merged[0].key.session_id, "session-A");
1981            assert!((merged[0].score - 1.0).abs() < 1e-9);
1982            assert!(merged[1].score.abs() < 1e-9);
1983        }
1984    }
1985}
1986
1987pub use search_handler::{
1988    FusedHit, FusionKey, RankedList, RetrieverKind, SearchMode, SearchPlan, build_filter,
1989    explain_search_plan, fuse_arms, hit_payload, plan_search, pond_search,
1990};
1991
1992#[cfg(test)]
1993mod tests {
1994    #![allow(clippy::expect_used, clippy::unwrap_used)]
1995
1996    use super::*;
1997    use crate::wire::{ProjectFilter, SearchFilters, SearchRequest};
1998    use chrono::Utc;
1999
2000    fn search_request(query: &str) -> SearchRequest {
2001        SearchRequest {
2002            protocol_version: crate::PROTOCOL_VERSION,
2003            namespace: Some("local".to_owned()),
2004            query: query.to_owned(),
2005            mode_override: None,
2006            filters: SearchFilters::default(),
2007            limit: 20,
2008        }
2009    }
2010
2011    fn key(session: &str, id: &str) -> crate::sessions::MessageKey {
2012        crate::sessions::MessageKey {
2013            session_id: session.to_owned(),
2014            message_id: id.to_owned(),
2015        }
2016    }
2017
2018    #[test]
2019    fn fuse_arms_fuses_retrievers_and_reports_provenance() {
2020        // Each session contributes at most one ballot per arm; cross-arm
2021        // agreement is credited per session_root, not per message_id.
2022        // Vector pool (raw cosine): a=0.9, b=0.7, c=0.5.
2023        // FTS pool (raw BM25):     b=10.0, a=8.0, d=4.0.
2024        let lists = [
2025            RankedList {
2026                retriever: RetrieverKind::Vector,
2027                entries: vec![
2028                    (key("session-a", "a"), 0.9),
2029                    (key("session-b", "b"), 0.7),
2030                    (key("session-c", "c"), 0.5),
2031                ],
2032                weight: 1.0,
2033            },
2034            RankedList {
2035                retriever: RetrieverKind::Fts,
2036                entries: vec![
2037                    (key("session-b", "b"), 10.0),
2038                    (key("session-a", "a"), 8.0),
2039                    (key("session-d", "d"), 4.0),
2040                ],
2041                weight: 0.135,
2042            },
2043        ];
2044        let merged = fuse_arms(&lists, FusionKey::SessionRoot);
2045
2046        // Vector normalized: a=1.0, b=0.5, c=0.0.
2047        // FTS normalized:    b=1.0, a=2/3, d=0.0.
2048        // session-a: 1.0 * 1.0 + 0.135 * 2/3 = 1.090
2049        // session-b: 1.0 * 0.5 + 0.135 * 1.0 = 0.635
2050        // session-c: 1.0 * 0.0 + 0 = 0
2051        // session-d: 0 + 0.135 * 0.0 = 0
2052        assert_eq!(merged[0].key.session_id, "session-a");
2053        assert_eq!(merged[1].key.session_id, "session-b");
2054        assert_eq!(merged[0].matched_via, vec!["vector", "fts"]);
2055        assert!(merged[0].score > merged[1].score);
2056
2057        let c = merged
2058            .iter()
2059            .find(|hit| hit.key.session_id == "session-c")
2060            .unwrap();
2061        assert_eq!(c.matched_via, vec!["vector"]);
2062        let d = merged
2063            .iter()
2064            .find(|hit| hit.key.session_id == "session-d")
2065            .unwrap();
2066        assert_eq!(d.matched_via, vec!["fts"]);
2067    }
2068
2069    #[test]
2070    fn hit_payload_returns_short_text_in_full() {
2071        let short = "a short message body";
2072        let text = hit_payload(short, "message");
2073        assert_eq!(text, short, "small text is returned as-is");
2074    }
2075
2076    #[test]
2077    fn hit_payload_windows_long_text_around_the_query_term() {
2078        // ~2400 chars: filler head, query term mid-body, filler tail.
2079        let body = format!("{}NEEDLE{}", "a".repeat(2000), "b".repeat(394));
2080        let text = hit_payload(&body, "needle");
2081        assert!(
2082            text.contains("NEEDLE"),
2083            "text is the match-windowed snippet: {text}"
2084        );
2085        // The <=600-char window is wrapped with truncation markers
2086        // ("[N chars before] " / " [+N more chars; pond_get for full]"); allow for their length.
2087        assert!(
2088            text.chars().count() <= 600 + 64,
2089            "snippet window is bounded by HIT_SNIPPET_CHARS plus markers: {}",
2090            text.chars().count()
2091        );
2092    }
2093
2094    #[test]
2095    fn hit_payload_snippet_survives_case_folding_that_changes_byte_length() {
2096        // `to_lowercase` of 'İ' is two code points, so the lowercased copy has
2097        // a different byte layout than the original. A query offset taken from
2098        // that copy must never be sliced into the original text.
2099        let body = format!("İÉÉÉ{}", "a".repeat(2100));
2100        let text = hit_payload(&body, "ééé");
2101        assert!(
2102            text.contains("ÉÉÉ"),
2103            "snippet windows on the matched term: {text}"
2104        );
2105    }
2106
2107    #[tokio::test]
2108    async fn restore_lineage_rejects_a_graph_nesting_deeper_than_one_level() {
2109        use crate::adapter::Extracted;
2110        use crate::sessions::Store;
2111        use crate::wire::{ProviderOptions, Session};
2112        use tempfile::TempDir;
2113
2114        let session = |id: &str, parent: Option<&str>| Session {
2115            id: id.to_owned(),
2116            parent_session_id: parent.map(str::to_owned),
2117            parent_message_id: None,
2118            source_agent: "claude-code".to_owned(),
2119            created_at: Utc::now(),
2120            project: Extracted::from_test_value("/tmp/pond".to_owned()),
2121            options: ProviderOptions::new(),
2122        };
2123
2124        let dir = TempDir::new().unwrap();
2125        let store = Store::open_local(dir.path()).await.unwrap();
2126        // A -> B -> C is a two-level spawn graph; spec 6.2 caps lineage at one.
2127        store
2128            .upsert_sessions(&[
2129                session("a", None),
2130                session("b", Some("a")),
2131                session("c", Some("b")),
2132            ])
2133            .await
2134            .unwrap();
2135
2136        // Restoring A reaches child B, then finds B is itself a parent of C.
2137        let err = restore_lineage(&store, "a").await.unwrap_err();
2138        assert!(
2139            err.to_string().contains("one subagent level"),
2140            "expected the deeper-graph error, got: {err}"
2141        );
2142
2143        // Restoring B is a clean one-level graph: B plus its single child C.
2144        let lineage = restore_lineage(&store, "b").await.unwrap();
2145        let ids: Vec<&str> = lineage.iter().map(|s| s.session.id.as_str()).collect();
2146        assert_eq!(ids, ["b", "c"]);
2147    }
2148
2149    #[test]
2150    fn build_filter_pushes_down_each_predicate_and_handles_empty() {
2151        let filters = SearchFilters {
2152            project: Some(ProjectFilter::Contains("/Users/me/pond".to_owned())),
2153            session_id: Some("01HXY".to_owned()),
2154            source_agent: Some("claude-code".to_owned()),
2155            from_date: Some("2026-01-01".to_owned()),
2156            to_date: Some("2026-05-01".to_owned()),
2157            min_score: 0.0,
2158            include_subagents: false,
2159        };
2160        let sql = build_filter(&filters).unwrap().to_lance();
2161        assert!(sql.contains("project LIKE '%/Users/me/pond%'"));
2162        assert!(sql.contains("session_id = '01HXY'"));
2163        assert!(sql.contains("source_agent = 'claude-code'"));
2164        assert!(sql.contains("timestamp >="));
2165        assert!(sql.contains("timestamp <="));
2166        // session_id/source_agent set => default subagent exclusion is skipped.
2167        assert!(!sql.contains("NOT ("));
2168
2169        assert_eq!(
2170            build_filter(&SearchFilters::default()).unwrap().to_lance(),
2171            "NOT (source_agent LIKE '%/%' ESCAPE '\\')",
2172        );
2173        assert_eq!(
2174            build_filter(&SearchFilters {
2175                include_subagents: true,
2176                ..SearchFilters::default()
2177            })
2178            .unwrap()
2179            .to_lance(),
2180            "",
2181        );
2182    }
2183
2184    #[test]
2185    fn build_filter_rejects_bad_date() {
2186        let bad_date = SearchFilters {
2187            from_date: Some("01-01-2026".to_owned()),
2188            ..SearchFilters::default()
2189        };
2190        assert!(build_filter(&bad_date).is_err());
2191    }
2192
2193    #[test]
2194    fn build_filter_contains_escapes_like_wildcards() {
2195        let filters = SearchFilters {
2196            project: Some(ProjectFilter::Contains("/Users/me/my_project".to_owned())),
2197            ..SearchFilters::default()
2198        };
2199        let sql = build_filter(&filters).unwrap().to_lance();
2200        // `_` is a LIKE wildcard and is everywhere in real paths; it must be escaped
2201        // so `my_project` matches literally, with an ESCAPE clause naming the char.
2202        assert!(
2203            sql.contains(r"my\_project"),
2204            "underscore must be escaped: {sql}"
2205        );
2206        assert!(
2207            sql.contains(r"ESCAPE '\'"),
2208            "predicate must declare the escape char: {sql}"
2209        );
2210    }
2211
2212    #[test]
2213    fn plan_search_shapes_request_for_each_planning_input() {
2214        let mut request = search_request("  vector memory  ");
2215        request.limit = 500;
2216        request.filters.min_score = 0.42;
2217        let plan = plan_search(request, SearchMode::Hybrid).unwrap();
2218        assert_eq!(plan.mode, SearchMode::Hybrid);
2219        assert_eq!(plan.query, "vector memory");
2220        assert_eq!(plan.limit, 200);
2221        assert_eq!(plan.pool, 1000);
2222        assert_eq!(plan.vector_pool, 2000);
2223        assert_eq!(plan.min_score, 0.42);
2224
2225        // Case 2: a tiny limit floors the pools so retrievers don't starve.
2226        let mut request = search_request("tiny pool");
2227        request.limit = 1;
2228        let plan = plan_search(request, SearchMode::Fts).unwrap();
2229        assert_eq!(plan.mode, SearchMode::Fts);
2230        assert_eq!(plan.limit, 1);
2231        assert_eq!(plan.pool, 50);
2232        assert_eq!(plan.vector_pool, 100);
2233
2234        // Case 3: filters get plumbed into the shared filter predicate.
2235        let mut request = search_request("filtered");
2236        request.filters.project = Some(ProjectFilter::Contains("/Users/me/pond".to_owned()));
2237        request.filters.source_agent = Some("claude-code".to_owned());
2238        let plan = plan_search(request, SearchMode::Fts).unwrap();
2239        let sql = plan.filter.to_lance();
2240        assert!(sql.contains("project LIKE"));
2241        assert!(sql.contains("source_agent = 'claude-code'"));
2242    }
2243
2244    #[test]
2245    fn plan_search_rejects_invalid_composition_before_execution() {
2246        let mut blank = search_request("   ");
2247        let error = plan_search(blank.clone(), SearchMode::Fts)
2248            .unwrap_err()
2249            .error;
2250        assert_eq!(error.code, crate::wire::ErrorCode::ValidationFailed);
2251        assert_eq!(error.details["field"], "query");
2252
2253        blank.query = "valid".to_owned();
2254        blank.limit = 0;
2255        let error = plan_search(blank.clone(), SearchMode::Fts)
2256            .unwrap_err()
2257            .error;
2258        assert_eq!(error.details["field"], "limit");
2259
2260        blank.limit = 1;
2261        blank.namespace = Some("remote".to_owned());
2262        let error = plan_search(blank, SearchMode::Fts).unwrap_err().error;
2263        assert_eq!(error.code, crate::wire::ErrorCode::NamespaceUnknown);
2264        assert_eq!(error.details["namespace"], "remote");
2265    }
2266}