1fn map_error(error: crate::Error) -> crate::wire::ErrorEnvelope {
2 error.into()
3}
4
5#[derive(Debug, Clone)]
10pub struct NamespaceIdent(pub Vec<String>);
11
12impl NamespaceIdent {
13 pub fn root() -> Self {
14 Self(vec![])
15 }
16 pub fn as_table_id(&self, table_name: &str) -> Vec<String> {
17 let mut id = self.0.clone();
18 id.push(table_name.to_string());
19 id
20 }
21}
22
23pub fn resolve_namespace(
27 namespace: Option<&str>,
28) -> Result<NamespaceIdent, crate::wire::ErrorEnvelope> {
29 match namespace {
30 None | Some(crate::wire::DEFAULT_NAMESPACE) => Ok(NamespaceIdent::root()),
31 Some(other) => Err(map_error(crate::Error::namespace_unknown(other))),
32 }
33}
34
35fn map_storage(error: anyhow::Error) -> crate::wire::ErrorEnvelope {
36 if let Some(conflict) = error.downcast_ref::<crate::substrate::ConflictExhausted>() {
39 return map_error(crate::Error::Conflict {
40 attempts: conflict.attempts,
41 });
42 }
43 map_error(crate::Error::Storage(error))
44}
45
46mod ingest_handler {
47 use anyhow::Result;
48 use tokio_stream::StreamExt;
49
50 use crate::{
51 adapter::{Adapter, AdapterYield, SkipOracle, SkipReason},
52 sessions::{IngestEvent, IngestSummary, IngestValidator, OutcomeStatus, RowOutcome, Store},
53 wire::{
54 ErrorBody, ErrorCode, IngestEnvelope, IngestRequest, IngestResponse, IngestResult,
55 IngestStatus, validate_protocol,
56 },
57 };
58
59 use super::{map_error, map_storage};
60
61 pub const MAX_INGEST_EVENTS: usize = 1000;
63
64 #[derive(Debug, Clone)]
71 pub enum SyncEvent {
72 Discovered { total: Option<usize> },
76 SessionDone(SessionOutcome),
79 }
80
81 #[derive(Debug, Clone)]
83 pub struct SessionOutcome {
84 pub project: Option<String>,
86 pub session_id: Option<String>,
89 pub messages: usize,
92 pub status: SyncStatus,
93 }
94
95 #[derive(Debug, Clone)]
108 pub enum SyncStatus {
109 Ok,
110 Partial {
111 dropped_events: usize,
112 first_drop_reason: Option<String>,
115 },
116 Skipped {
117 reason: String,
118 },
119 Rejected {
120 reason: String,
121 },
122 Fresh,
125 Empty,
129 }
130
131 #[derive(Debug, Default)]
132 struct InFlight {
133 project: Option<String>,
134 session_id: String,
135 messages: usize,
136 dropped_events: usize,
141 first_drop_reason: Option<String>,
142 session_index: usize,
146 }
147
148 #[derive(Debug)]
153 struct PendingDone {
154 project: Option<String>,
155 session_id: String,
156 messages: usize,
157 dropped_events: usize,
158 first_drop_reason: Option<String>,
159 session_index: usize,
160 }
161
162 const ADAPTER_FLUSH_BATCH: usize = 100;
169
170 pub async fn ingest_adapter<F>(
181 store: &Store,
182 adapter: &dyn Adapter,
183 oracle: &dyn SkipOracle,
184 mut on_event: F,
185 ) -> Result<IngestSummary>
186 where
187 F: FnMut(SyncEvent),
188 {
189 let mut summary = IngestSummary::default();
190 let truncations_before = crate::adapter::extract::truncated_values_count();
191 let total = adapter
195 .discover()
196 .await
197 .map_err(|error| tracing::debug!(%error, "adapter discover failed"))
198 .ok();
199 on_event(SyncEvent::Discovered { total });
200
201 let mut events = adapter.events_with(oracle);
202 let mut validator = IngestValidator::default();
203 let mut index = 0usize;
207 let mut in_flight: Option<InFlight> = None;
208 let mut pending_dones: std::collections::VecDeque<PendingDone> =
212 std::collections::VecDeque::new();
213 let mut decode_total = std::time::Duration::ZERO;
218 let mut decode_count = 0u64;
219 let mut validator_total = std::time::Duration::ZERO;
220 let mut validator_count = 0u64;
221 let run_started = std::time::Instant::now();
222
223 loop {
224 let decode_start = std::time::Instant::now();
225 let next = events.next().await;
226 decode_total += decode_start.elapsed();
227 decode_count += 1;
228 let event = match next {
229 Some(event) => event,
230 None => break,
231 };
232 match event {
233 Ok(AdapterYield::Skipped {
234 session_id,
235 project,
236 reason,
237 }) => {
238 let status = match reason {
239 SkipReason::Fresh => {
240 summary.skipped_fresh += 1;
241 SyncStatus::Fresh
242 }
243 SkipReason::Empty => {
244 summary.skipped_empty += 1;
245 SyncStatus::Empty
246 }
247 };
248 on_event(SyncEvent::SessionDone(SessionOutcome {
249 project,
250 session_id,
251 messages: 0,
252 status,
253 }));
254 }
255 Ok(AdapterYield::Event(event)) => {
256 if matches!(&event, IngestEvent::Session(_))
261 && let Some(prev) = in_flight.take()
262 {
263 pending_dones.push_back(PendingDone {
264 project: prev.project,
265 session_id: prev.session_id,
266 messages: prev.messages,
267 dropped_events: prev.dropped_events,
268 first_drop_reason: prev.first_drop_reason,
269 session_index: prev.session_index,
270 });
271 }
272 let event_index = index;
273 match &event {
274 IngestEvent::Session(session) => {
275 in_flight = Some(InFlight {
276 project: Some((*session.project).clone()),
277 session_id: session.id.clone(),
278 messages: 0,
279 dropped_events: 0,
280 first_drop_reason: None,
281 session_index: event_index,
282 });
283 }
284 IngestEvent::Message(_) => {
285 if let Some(slot) = in_flight.as_mut() {
286 slot.messages += 1;
287 }
288 }
289 IngestEvent::Part(_) => {}
290 }
291
292 let validator_start = std::time::Instant::now();
293 let push_outcomes = validator.push(store, index, event).await?;
294 validator_total += validator_start.elapsed();
295 validator_count += 1;
296 for outcome in &push_outcomes {
303 if matches!(outcome.status, OutcomeStatus::Error)
304 && outcome.kind != "session"
305 && let Some(slot) = in_flight.as_mut()
306 {
307 slot.dropped_events += 1;
308 if slot.first_drop_reason.is_none() {
309 slot.first_drop_reason =
310 outcome.error.as_ref().map(|err| err.message.clone());
311 }
312 }
313 }
314 summary.add_outcomes(&push_outcomes);
315 index += 1;
316
317 if validator.pending_substreams() >= ADAPTER_FLUSH_BATCH {
322 let flush_start = std::time::Instant::now();
323 let (flush_outcomes, flush_counts) = validator.flush(store).await?;
324 validator_total += flush_start.elapsed();
325 validator_count += 1;
326 summary.add_outcomes_errors_only(&flush_outcomes);
330 summary.add_batch(&flush_counts);
331 drain_pending_dones(&mut pending_dones, &flush_outcomes, &mut on_event);
332 }
333 }
334 Err(error) => {
335 tracing::debug!(
341 %error,
342 "adapter event error (per-line drop by design)"
343 );
344 match in_flight.as_mut() {
345 Some(slot) => {
346 slot.dropped_events += 1;
350 if slot.first_drop_reason.is_none() {
351 slot.first_drop_reason = Some(error.to_string());
352 }
353 summary.dropped_events += 1;
354 }
355 None => {
356 summary.skipped_files += 1;
361 on_event(SyncEvent::SessionDone(SessionOutcome {
362 project: None,
363 session_id: None,
364 messages: 0,
365 status: SyncStatus::Skipped {
366 reason: error.to_string(),
367 },
368 }));
369 }
370 }
371 }
372 }
373 }
374
375 if let Some(prev) = in_flight.take() {
376 pending_dones.push_back(PendingDone {
377 project: prev.project,
378 session_id: prev.session_id,
379 messages: prev.messages,
380 dropped_events: prev.dropped_events,
381 first_drop_reason: prev.first_drop_reason,
382 session_index: prev.session_index,
383 });
384 }
385 let validator_start = std::time::Instant::now();
386 let (final_outcomes, final_counts) = validator.finish(store).await?;
387 validator_total += validator_start.elapsed();
388 validator_count += 1;
389 summary.add_outcomes_errors_only(&final_outcomes);
390 summary.add_batch(&final_counts);
391 drain_pending_dones(&mut pending_dones, &final_outcomes, &mut on_event);
392
393 summary.truncated_values = crate::adapter::extract::truncated_values_count()
394 .saturating_sub(truncations_before) as usize;
395
396 let total = run_started.elapsed();
397 let other = total
398 .saturating_sub(decode_total)
399 .saturating_sub(validator_total);
400 tracing::info!(
401 target: "pond::perf",
402 total_ms = total.as_millis() as u64,
403 decode_ms = decode_total.as_millis() as u64,
404 validator_ms = validator_total.as_millis() as u64,
405 other_ms = other.as_millis() as u64,
406 decode_calls = decode_count,
407 validator_calls = validator_count,
408 rows_inserted = summary.inserted as u64,
409 rows_matched = summary.matched as u64,
410 dropped_events = summary.dropped_events as u64,
411 dropped_sessions = summary.dropped_sessions as u64,
412 skipped_files = summary.skipped_files as u64,
413 skipped_fresh = summary.skipped_fresh as u64,
414 truncated_values = summary.truncated_values as u64,
415 "ingest_adapter complete"
416 );
417 Ok(summary)
418 }
419
420 fn drain_pending_dones<F>(
427 queue: &mut std::collections::VecDeque<PendingDone>,
428 outcomes: &[RowOutcome],
429 on_event: &mut F,
430 ) where
431 F: FnMut(SyncEvent),
432 {
433 let mut session_outcome_by_index: std::collections::HashMap<usize, &RowOutcome> =
436 std::collections::HashMap::new();
437 for outcome in outcomes {
438 if outcome.kind == "session" {
439 session_outcome_by_index.insert(outcome.index, outcome);
440 }
441 }
442
443 while let Some(done) = queue.pop_front() {
444 let session_outcome = session_outcome_by_index.get(&done.session_index).copied();
445 let rejection_reason = session_outcome.and_then(|outcome| {
446 if matches!(outcome.status, OutcomeStatus::Error) {
447 Some(
448 outcome
449 .error
450 .as_ref()
451 .map(|err| err.message.clone())
452 .unwrap_or_else(|| "session-level rejection".to_owned()),
453 )
454 } else {
455 None
456 }
457 });
458 let status = if let Some(reason) = rejection_reason {
459 SyncStatus::Rejected { reason }
460 } else if done.dropped_events > 0 {
461 SyncStatus::Partial {
462 dropped_events: done.dropped_events,
463 first_drop_reason: done.first_drop_reason,
464 }
465 } else {
466 SyncStatus::Ok
467 };
468 on_event(SyncEvent::SessionDone(SessionOutcome {
469 project: done.project,
470 session_id: Some(done.session_id),
471 messages: done.messages,
472 status,
473 }));
474 }
475 }
476
477 pub async fn pond_ingest(store: &Store, request: IngestRequest) -> IngestEnvelope {
483 if let Err(envelope) = validate_protocol(request.protocol_version) {
484 return IngestEnvelope::Error(envelope);
485 }
486 if let Err(envelope) = super::resolve_namespace(request.namespace.as_deref()) {
487 return IngestEnvelope::Error(envelope);
488 }
489 if request.events.is_empty() {
490 return IngestEnvelope::Error(map_error(crate::Error::validation_field(
491 "events must be a non-empty array",
492 "events",
493 Some(serde_json::json!([])),
494 Some("non-empty array".to_owned()),
495 )));
496 }
497 if request.events.len() > MAX_INGEST_EVENTS {
498 return IngestEnvelope::Error(map_error(crate::Error::validation_field(
499 format!("ingest batch exceeds the event cap: at most {MAX_INGEST_EVENTS} events"),
500 "events",
501 Some(serde_json::json!(request.events.len())),
502 Some(format!("at most {MAX_INGEST_EVENTS} events")),
503 )));
504 }
505
506 match ingest_events(store, request.events).await {
507 Ok(outcomes) => {
508 let mut accepted = 0;
509 let mut rejected = 0;
510 for outcome in &outcomes {
511 match outcome.status {
512 OutcomeStatus::Inserted | OutcomeStatus::Matched => accepted += 1,
513 OutcomeStatus::Error => rejected += 1,
514 }
515 }
516 let results = outcomes
517 .into_iter()
518 .map(outcome_to_result)
519 .collect::<Vec<_>>();
520 IngestEnvelope::Success(IngestResponse {
521 accepted,
522 rejected,
523 results,
524 })
525 }
526 Err(failure) => IngestEnvelope::Error(map_storage(failure)),
527 }
528 }
529
530 pub async fn ingest_events(store: &Store, events: Vec<IngestEvent>) -> Result<Vec<RowOutcome>> {
536 let mut validator = IngestValidator::default();
537 let mut outcomes = Vec::with_capacity(events.len());
538 for (index, event) in events.into_iter().enumerate() {
539 let mut chunk = validator.push(store, index, event).await?;
540 outcomes.append(&mut chunk);
541 }
542 let (mut tail, _counts) = validator.finish(store).await?;
545 outcomes.append(&mut tail);
546 outcomes.sort_by_key(|outcome| outcome.index);
547 Ok(outcomes)
548 }
549
550 fn outcome_to_result(outcome: RowOutcome) -> IngestResult {
551 let (status, error) = match (outcome.status, outcome.error) {
552 (OutcomeStatus::Inserted, _) => (IngestStatus::Inserted, None),
553 (OutcomeStatus::Matched, _) => (IngestStatus::Matched, None),
554 (OutcomeStatus::Error, error) => {
555 let body = error
556 .map(|err| {
557 let mut details = serde_json::Map::new();
558 if let Some(field) = err.field {
559 details.insert("field".to_owned(), serde_json::json!(field));
560 }
561 if let Some(reason) = err.reason {
562 details.insert("reason".to_owned(), serde_json::json!(reason));
563 }
564 ErrorBody {
565 code: ErrorCode::ValidationFailed,
566 message: err.message,
567 details: serde_json::Value::Object(details),
568 }
569 })
570 .unwrap_or_else(|| ErrorBody {
571 code: ErrorCode::ValidationFailed,
572 message: "ingest failed".to_owned(),
573 details: serde_json::json!({}),
574 });
575 (IngestStatus::Error, Some(body))
576 }
577 };
578 IngestResult {
579 index: outcome.index,
580 kind: outcome.kind.to_owned(),
581 pk: outcome.pk,
582 status,
583 error,
584 }
585 }
586}
587
588pub use crate::sessions::{IngestEvent, IngestSummary, IngestValidator, search_text};
589pub use ingest_handler::{
590 MAX_INGEST_EVENTS, SessionOutcome, SyncEvent, SyncStatus, ingest_adapter, ingest_events,
591 pond_ingest,
592};
593
594mod export_handler {
595 use anyhow::{Context, Result};
606 use tokio::io::{AsyncWrite, AsyncWriteExt};
607
608 use crate::sessions::{IngestEvent, Store};
609
610 #[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
611 pub struct ExportSummary {
612 pub sessions: usize,
613 pub messages: usize,
614 pub parts: usize,
615 }
616
617 pub async fn pond_export<W>(
618 store: &Store,
619 session_filter: Option<&str>,
620 writer: &mut W,
621 ) -> Result<ExportSummary>
622 where
623 W: AsyncWrite + Unpin,
624 {
625 let mut session_ids = match session_filter {
626 Some(id) => vec![id.to_owned()],
627 None => store.session_ids().await?,
628 };
629 session_ids.sort();
630
631 let mut summary = ExportSummary::default();
632 for session_id in session_ids {
633 let Some(stored) = store
634 .get_session(&session_id)
635 .await
636 .with_context(|| format!("export: failed to load session {session_id}"))?
637 else {
638 if session_filter.is_some() {
639 anyhow::bail!("export: session not found: {session_id}");
640 }
641 continue;
642 };
643 write_event(writer, &IngestEvent::Session(stored.session)).await?;
644 summary.sessions += 1;
645 for message_with_parts in stored.messages {
646 write_event(writer, &IngestEvent::Message(message_with_parts.message)).await?;
647 summary.messages += 1;
648 for part in message_with_parts.parts {
649 write_event(writer, &IngestEvent::Part(part)).await?;
650 summary.parts += 1;
651 }
652 }
653 }
654 writer.flush().await.context("export: flush failed")?;
655 Ok(summary)
656 }
657
658 async fn write_event<W>(writer: &mut W, event: &IngestEvent) -> Result<()>
659 where
660 W: AsyncWrite + Unpin,
661 {
662 let line = serde_json::to_string(event).context("export: serialize event")?;
663 writer
664 .write_all(line.as_bytes())
665 .await
666 .context("export: write event")?;
667 writer
668 .write_all(b"\n")
669 .await
670 .context("export: write newline")?;
671 Ok(())
672 }
673}
674
675pub use export_handler::{ExportSummary, pond_export};
676
677mod restore_handler {
678 use anyhow::{Context, Result, bail};
685
686 use crate::sessions::{SessionWithMessages, Store};
687
688 pub async fn restore_lineage(
689 store: &Store,
690 session_id: &str,
691 ) -> Result<Vec<SessionWithMessages>> {
692 let Some(parent) = store.get_session(session_id).await? else {
693 bail!("export: session not found: {session_id}");
694 };
695 let mut sessions = vec![parent];
696 for child in store.child_sessions(session_id).await? {
697 if !store.child_sessions(&child.id).await?.is_empty() {
698 bail!(
699 "adapter-lineage-complete-restore supports one subagent level; session {} has child sessions",
700 child.id
701 );
702 }
703 let child_id = child.id;
704 let stored = store
705 .get_session(&child_id)
706 .await?
707 .with_context(|| format!("export: child session disappeared: {child_id}"))?;
708 sessions.push(stored);
709 }
710 Ok(sessions)
711 }
712}
713
714pub use restore_handler::restore_lineage;
715
716mod get_handler {
717 use crate::{
718 sessions::{GetLookup, MessageViewParams, RetrievedMessage, SessionViewParams, Store},
719 wire::{
720 GetEnvelope, GetRequest, GetResponse, GetResult, GetSession, MessageView, PartSummary,
721 ResponseMode, ResponsePart, validate_protocol,
722 },
723 };
724
725 use super::{map_error, map_storage};
726
727 fn to_message_view(message: RetrievedMessage, verbatim: bool) -> MessageView {
732 if verbatim {
733 return MessageView {
734 id: message.id,
735 role: message.role,
736 timestamp: message.timestamp,
737 text: None,
738 content: None,
739 parts_summary: Vec::new(),
740 parts: Some(
741 message
742 .parts
743 .into_iter()
744 .map(ResponsePart::from_part)
745 .collect(),
746 ),
747 };
748 }
749 let parts_summary = message
750 .parts
751 .iter()
752 .filter_map(|part| PartSummary::for_kind(&part.kind))
753 .collect();
754 MessageView {
755 id: message.id,
756 role: message.role,
757 timestamp: message.timestamp,
758 text: message.text,
759 content: message.content,
760 parts_summary,
761 parts: None,
762 }
763 }
764
765 const BUDGET_BYTES: usize = 200_000;
770
771 pub async fn pond_get(store: &Store, request: GetRequest) -> GetEnvelope {
772 if let Err(error) = validate_protocol(request.protocol_version) {
773 return GetEnvelope::Error(error);
774 }
775 if let Err(envelope) = super::resolve_namespace(request.namespace.as_deref()) {
776 return GetEnvelope::Error(envelope);
777 }
778
779 let response = match (&request.session_id, &request.message_id) {
780 (Some(session_id), None) => session_result(store, session_id, &request).await,
781 (None, Some(message_id)) => message_result(store, message_id, &request).await,
782 (Some(_), Some(_)) => Err(map_error(crate::Error::validation_field(
783 "session_id and message_id are mutually exclusive",
784 "message_id",
785 request.message_id.clone().map(serde_json::Value::String),
786 Some("omit when session_id is present".to_owned()),
787 ))),
788 (None, None) => Err(map_error(crate::Error::validation(
789 "one of session_id or message_id is required",
790 ))),
791 };
792
793 match response {
794 Ok(response) => GetEnvelope::Success(response),
795 Err(error) => GetEnvelope::Error(error),
796 }
797 }
798
799 fn unknown_after_id(request: &GetRequest, anchor_of: &str) -> crate::wire::ErrorEnvelope {
802 map_error(crate::Error::validation_field(
803 "after_id not found (stale or mistyped continuation anchor)",
804 "after_id",
805 request.after_id.clone().map(serde_json::Value::String),
806 Some(format!("a {anchor_of} from a prior page of this read")),
807 ))
808 }
809
810 async fn session_result(
811 store: &Store,
812 session_id: &str,
813 request: &GetRequest,
814 ) -> Result<GetResponse, crate::wire::ErrorEnvelope> {
815 let params = SessionViewParams {
816 mode: request.response_mode,
817 after_id: request.after_id.as_deref(),
818 limit: request.limit,
819 budget_bytes: BUDGET_BYTES,
820 };
821 let view = match store
822 .session_view(session_id, params)
823 .await
824 .map_err(map_storage)?
825 {
826 GetLookup::NotFound => {
827 return Err(map_error(crate::Error::not_found(
828 "session",
829 serde_json::json!(session_id),
830 format!("session not found: {session_id}"),
831 )));
832 }
833 GetLookup::UnknownAfterId => return Err(unknown_after_id(request, "message id")),
834 GetLookup::Found(view) => view,
835 };
836 let verbatim = matches!(request.response_mode, ResponseMode::Verbatim);
837 Ok(GetResponse {
838 session: GetSession::from_session(&view.session),
839 result: GetResult::Session {
840 messages: view
841 .messages
842 .into_iter()
843 .map(|message| to_message_view(message, verbatim))
844 .collect(),
845 messages_remaining: view.messages_remaining,
846 },
847 })
848 }
849
850 async fn message_result(
851 store: &Store,
852 message_id: &str,
853 request: &GetRequest,
854 ) -> Result<GetResponse, crate::wire::ErrorEnvelope> {
855 let params = MessageViewParams {
856 context_depth: request.context_depth,
857 after_id: request.after_id.as_deref(),
858 limit: request.limit,
859 budget_bytes: BUDGET_BYTES,
860 };
861 let view = match store
862 .message_view(message_id, params)
863 .await
864 .map_err(map_storage)?
865 {
866 GetLookup::NotFound => {
867 return Err(map_error(crate::Error::not_found(
868 "message",
869 serde_json::json!(message_id),
870 format!("message not found: {message_id}"),
871 )));
872 }
873 GetLookup::UnknownAfterId => return Err(unknown_after_id(request, "part id")),
874 GetLookup::Found(view) => view,
875 };
876 let target = MessageView {
879 id: view.target.id,
880 role: view.target.role,
881 timestamp: view.target.timestamp,
882 text: None,
883 content: None,
884 parts_summary: Vec::new(),
885 parts: None,
886 };
887 Ok(GetResponse {
888 session: GetSession::from_session(&view.session),
889 result: GetResult::Message {
890 target,
891 target_parts: view
892 .target_parts
893 .into_iter()
894 .map(ResponsePart::from_part)
895 .collect(),
896 target_parts_remaining: view.target_parts_remaining,
897 siblings: view
898 .siblings
899 .into_iter()
900 .map(|sibling| to_message_view(sibling, false))
901 .collect(),
902 },
903 })
904 }
905}
906
907pub use get_handler::pond_get;
908
909mod search_handler {
910 use base64::{Engine, engine::general_purpose::URL_SAFE_NO_PAD};
915
916 use crate::{
917 Clock, SystemClock,
918 embed::{Embedder, LazyEmbedder, format_query},
919 sessions::{MessageKey, MessageMeta, Store},
920 substrate::{Predicate, ScalarValue},
921 wire::{
922 ErrorEnvelope, PartSummary, ProjectFilter, Role, SearchCursor, SearchEnvelope,
923 SearchFilters, SearchRequest, SearchResponse, SearchResult, SearchSession,
924 validate_protocol,
925 },
926 };
927 use chrono::NaiveDate;
928 use std::collections::HashMap;
929
930 use super::{map_error, map_storage};
931
932 #[derive(Debug, Clone, Copy, PartialEq, Eq)]
937 pub enum SearchMode {
938 Hybrid,
939 Fts,
940 Vector,
941 }
942
943 #[derive(Debug, Clone, PartialEq)]
944 pub struct SearchPlan {
945 pub mode: SearchMode,
946 pub query: String,
947 pub similar_to: Option<String>,
951 pub filter: Predicate,
952 pub filters: SearchFilters,
953 pub pool: usize,
954 pub vector_pool: usize,
955 pub limit: usize,
956 pub offset: usize,
957 pub min_score: f64,
958 }
959
960 const LIMIT_CAP: usize = 200;
961 const MAX_MATCHES_PER_SESSION: usize = 3;
962 const SEARCH_BUDGET_BYTES: usize = 60_000;
963 const HIT_SNIPPET_CHARS: usize = 600;
967 const SCORE_DENOMINATOR: f64 = FTS_FUSION_WEIGHT + VECTOR_FUSION_WEIGHT;
968
969 const FTS_FUSION_WEIGHT: f64 = 0.135;
978 const VECTOR_FUSION_WEIGHT: f64 = 1.0;
979
980 fn encode_search_cursor(cursor: &SearchCursor) -> String {
981 #[allow(clippy::expect_used)]
982 let bytes = serde_json::to_vec(cursor).expect("search cursor encodes as JSON");
983 URL_SAFE_NO_PAD.encode(bytes)
984 }
985
986 fn decode_search_cursor(raw: &str) -> Result<SearchCursor, ErrorEnvelope> {
987 let bytes = URL_SAFE_NO_PAD.decode(raw).map_err(|_| {
988 map_error(crate::Error::validation_field(
989 "cursor is malformed (expected opaque value from a prior response)",
990 "cursor",
991 Some(serde_json::json!(raw)),
992 Some("opaque base64url".to_owned()),
993 ))
994 })?;
995 serde_json::from_slice(&bytes).map_err(|_| {
996 map_error(crate::Error::validation_field(
997 "cursor is malformed (decode failed)",
998 "cursor",
999 Some(serde_json::json!(raw)),
1000 Some("opaque cursor from a prior response".to_owned()),
1001 ))
1002 })
1003 }
1004
1005 pub async fn pond_search(
1014 store: &Store,
1015 embedder: &LazyEmbedder,
1016 request: SearchRequest,
1017 search: &crate::config::SearchConfig,
1018 ) -> SearchEnvelope {
1019 match run_search(store, embedder, request, search, &SystemClock).await {
1020 Ok(response) => SearchEnvelope::Success(response),
1021 Err(envelope) => SearchEnvelope::Error(envelope),
1022 }
1023 }
1024
1025 pub async fn explain_search_plan(
1026 store: &Store,
1027 embedder: &LazyEmbedder,
1028 request: SearchRequest,
1029 search: &crate::config::SearchConfig,
1030 ) -> Result<String, ErrorEnvelope> {
1031 let override_mode = request.mode_override.map(wire_mode_to_internal);
1032 let mut plan = plan_search(request, SearchMode::Fts)?;
1033 plan.mode = resolve_effective_mode(store, override_mode).await?;
1034 let mut out = String::new();
1035 if matches!(plan.mode, SearchMode::Fts | SearchMode::Hybrid) {
1036 let fts = store
1037 .explain_fts_plan(&plan.query, plan.pool, &plan.filter)
1038 .await
1039 .map_err(map_storage)?;
1040 out.push_str("fts:\n");
1041 out.push_str(&fts);
1042 out.push('\n');
1043 }
1044 if matches!(plan.mode, SearchMode::Vector | SearchMode::Hybrid) {
1045 let backend = load_embedder(embedder).await?;
1046 let vector = embed_query(backend.as_ref(), &plan.query)?;
1047 let vector_plan = store
1048 .explain_vector_plan(&vector, plan.vector_pool, &plan.filter, Some(search))
1049 .await
1050 .map_err(map_storage)?;
1051 out.push_str("vector:\n");
1052 out.push_str(&vector_plan);
1053 out.push('\n');
1054 }
1055 Ok(out)
1056 }
1057
1058 async fn run_search(
1059 store: &Store,
1060 embedder: &LazyEmbedder,
1061 request: SearchRequest,
1062 search: &crate::config::SearchConfig,
1063 _clock: &dyn Clock,
1064 ) -> Result<SearchResponse, ErrorEnvelope> {
1065 let override_mode = request.mode_override.map(wire_mode_to_internal);
1066 let mut plan = plan_search(request, SearchMode::Fts)?;
1067
1068 if plan.similar_to.is_some() {
1072 plan.mode = SearchMode::Vector;
1073 } else {
1074 plan.mode = resolve_effective_mode(store, override_mode).await?;
1078 }
1079
1080 let candidates = match plan.mode {
1081 SearchMode::Fts => {
1082 let hits = store
1083 .fts_search(&plan.query, plan.pool, &plan.filter)
1084 .await
1085 .map_err(map_storage)?;
1086 normalize_fts(hits)
1087 }
1088 SearchMode::Hybrid => {
1089 let backend = load_embedder(embedder).await?;
1090 let vector = embed_query(backend.as_ref(), &plan.query)?;
1091 let fts_fut = async {
1094 store
1095 .fts_search(&plan.query, plan.pool, &plan.filter)
1096 .await
1097 .map_err(map_storage)
1098 };
1099 let vector_fut = async {
1100 store
1101 .vector_search(&vector, plan.vector_pool, &plan.filter, Some(search))
1102 .await
1103 .map_err(map_storage)
1104 };
1105 let (fts, vector_raw) = tokio::try_join!(fts_fut, vector_fut)?;
1106 let fts_max = fts.iter().map(|(_, s)| *s).fold(0.0_f32, f32::max);
1118 let fts_entries: Vec<(MessageKey, f64)> = fts
1119 .into_iter()
1120 .map(|(key, score)| {
1121 let normed = if fts_max > 0.0 {
1122 f64::from(score / fts_max)
1123 } else {
1124 0.0
1125 };
1126 (key, normed)
1127 })
1128 .collect();
1129 let vec_n = vector_raw.len() as f64;
1130 let vector_entries: Vec<(MessageKey, f64)> = vector_raw
1131 .into_iter()
1132 .enumerate()
1133 .map(|(idx, (key, _))| {
1134 let normed = if vec_n > 0.0 {
1135 1.0 - (idx as f64 / vec_n)
1136 } else {
1137 0.0
1138 };
1139 (key, normed)
1140 })
1141 .collect();
1142 let lists = [
1147 RankedList {
1148 retriever: RetrieverKind::Fts,
1149 entries: fts_entries,
1150 weight: FTS_FUSION_WEIGHT,
1151 },
1152 RankedList {
1153 retriever: RetrieverKind::Vector,
1154 entries: vector_entries,
1155 weight: VECTOR_FUSION_WEIGHT,
1156 },
1157 ];
1158 fuse_arms(&lists)
1159 .into_iter()
1160 .map(|hit| Candidate {
1161 session_id: hit.key.session_id,
1162 message_id: hit.key.message_id,
1163 base_score: hit.score,
1164 })
1165 .collect()
1166 }
1167 SearchMode::Vector => {
1174 let vector = if let Some(similar_id) = &plan.similar_to {
1175 let stored = store
1176 .message_vector_by_id(similar_id)
1177 .await
1178 .map_err(map_storage)?;
1179 let Some(vector) = stored else {
1180 return Err(map_error(crate::Error::not_found(
1181 "message",
1182 serde_json::json!(similar_id),
1183 format!(
1184 "no embedded message with id {similar_id} (the message may not \
1185 exist, or it exists but is not yet embedded - run `pond embed`)"
1186 ),
1187 )));
1188 };
1189 vector
1190 } else {
1191 let backend = load_embedder(embedder).await?;
1192 embed_query(backend.as_ref(), &plan.query)?
1193 };
1194 let vector_raw = store
1195 .vector_search(&vector, plan.vector_pool, &plan.filter, Some(search))
1196 .await
1197 .map_err(map_storage)?;
1198 normalize_vector(vector_raw)
1199 }
1200 };
1201
1202 if candidates.is_empty() {
1203 return Ok(empty_response());
1204 }
1205
1206 let keys = candidates
1209 .iter()
1210 .map(|candidate| MessageKey {
1211 session_id: candidate.session_id.clone(),
1212 message_id: candidate.message_id.clone(),
1213 })
1214 .collect::<Vec<_>>();
1215 let metas = store
1216 .message_metas_by_keys(&keys)
1217 .await
1218 .map_err(map_storage)?;
1219 let meta_index = metas
1220 .iter()
1221 .map(|meta| ((meta.session_id.as_str(), meta.message_id.as_str()), meta))
1222 .collect::<std::collections::HashMap<_, _>>();
1223
1224 let mut scored = Vec::with_capacity(candidates.len());
1225 for candidate in candidates {
1226 let Some(meta) =
1227 meta_index.get(&(candidate.session_id.as_str(), candidate.message_id.as_str()))
1228 else {
1229 continue;
1230 };
1231 let score = candidate.base_score;
1232 if score < plan.min_score {
1233 continue;
1234 }
1235 scored.push(ScoredHit {
1236 meta: (*meta).clone(),
1237 score,
1238 });
1239 }
1240 scored.sort_by(|left, right| {
1241 right
1242 .score
1243 .partial_cmp(&left.score)
1244 .unwrap_or(std::cmp::Ordering::Equal)
1245 .then_with(|| left.meta.session_id.cmp(&right.meta.session_id))
1246 .then_with(|| left.meta.message_id.cmp(&right.meta.message_id))
1247 });
1248
1249 let matched_total = scored.len();
1250 let sessions = build_sessions(store, &scored, &plan.query).await?;
1251 page_sessions(sessions, matched_total, &plan)
1252 }
1253
1254 async fn resolve_effective_mode(
1259 store: &Store,
1260 override_mode: Option<SearchMode>,
1261 ) -> Result<SearchMode, ErrorEnvelope> {
1262 if let Some(mode) = override_mode {
1263 return Ok(mode);
1264 }
1265 let has = store.has_embeddings().await.map_err(map_storage)?;
1266 Ok(if has {
1267 SearchMode::Hybrid
1268 } else {
1269 SearchMode::Fts
1270 })
1271 }
1272
1273 async fn load_embedder(
1277 embedder: &LazyEmbedder,
1278 ) -> Result<std::sync::Arc<dyn Embedder>, ErrorEnvelope> {
1279 embedder.get().await.map_err(|error| {
1280 map_error(crate::Error::internal(format!(
1281 "embedder load failed: {error}"
1282 )))
1283 })
1284 }
1285
1286 pub fn plan_search(
1287 request: SearchRequest,
1288 mode: SearchMode,
1289 ) -> Result<SearchPlan, ErrorEnvelope> {
1290 validate_protocol(request.protocol_version)?;
1291
1292 let _ns = super::resolve_namespace(request.namespace.as_deref())?;
1293
1294 let cursor = match request.cursor.as_deref() {
1295 Some(raw) => Some(decode_search_cursor(raw)?),
1296 None => None,
1297 };
1298 let (query_raw, similar_raw, filters, offset) = match cursor {
1299 Some(cursor) => (
1300 cursor.query,
1301 cursor.similar_to,
1302 cursor.filters,
1303 cursor.offset,
1304 ),
1305 None => (request.query, request.similar_to, request.filters, 0),
1306 };
1307 let query = query_raw.trim().to_owned();
1308 let similar_to = similar_raw
1309 .as_ref()
1310 .map(|id| id.trim().to_owned())
1311 .filter(|id| !id.is_empty());
1312 if similar_to.is_none() && query.is_empty() {
1313 return Err(map_error(crate::Error::validation_field(
1314 "query must be non-empty after trim",
1315 "query",
1316 Some(serde_json::json!(query_raw)),
1317 Some("non-empty string after trim, or pass `similar_to`".to_owned()),
1318 )));
1319 }
1320 if request.limit == 0 {
1321 return Err(map_error(crate::Error::validation_field(
1322 "limit must be at least 1",
1323 "limit",
1324 Some(serde_json::json!(request.limit)),
1325 Some("integer >= 1".to_owned()),
1326 )));
1327 }
1328 let limit = request.limit.min(LIMIT_CAP);
1329 let min_score = filters.min_score;
1330 let filter = build_filter(&filters)?;
1331 let pool = limit.saturating_mul(5).max(50);
1334 Ok(SearchPlan {
1335 mode,
1336 query,
1337 similar_to,
1338 filter,
1339 filters,
1340 pool,
1341 vector_pool: pool.saturating_mul(2),
1342 limit,
1343 offset,
1344 min_score,
1345 })
1346 }
1347
1348 #[derive(Debug, Clone, Copy, PartialEq, Eq)]
1349 pub enum RetrieverKind {
1350 Vector,
1351 Fts,
1352 }
1353
1354 impl RetrieverKind {
1355 fn as_wire(self) -> &'static str {
1356 match self {
1357 Self::Vector => "vector",
1358 Self::Fts => "fts",
1359 }
1360 }
1361 }
1362
1363 pub struct RankedList {
1370 pub retriever: RetrieverKind,
1371 pub entries: Vec<(MessageKey, f64)>,
1372 pub weight: f64,
1373 }
1374
1375 fn wire_mode_to_internal(wire: crate::wire::SearchModeWire) -> SearchMode {
1378 match wire {
1379 crate::wire::SearchModeWire::Fts => SearchMode::Fts,
1380 crate::wire::SearchModeWire::Vector => SearchMode::Vector,
1381 crate::wire::SearchModeWire::Hybrid => SearchMode::Hybrid,
1382 }
1383 }
1384
1385 #[derive(Debug, Clone, PartialEq)]
1387 pub struct FusedHit {
1388 pub key: MessageKey,
1389 pub score: f64,
1390 pub matched_via: Vec<String>,
1391 }
1392
1393 fn session_root(session_id: &str) -> &str {
1398 match session_id.find('/') {
1399 Some(idx) => &session_id[..idx],
1400 None => session_id,
1401 }
1402 }
1403
1404 pub fn fuse_arms(lists: &[RankedList]) -> Vec<FusedHit> {
1427 let mut merged: std::collections::HashMap<String, (f64, Vec<String>, MessageKey)> =
1428 std::collections::HashMap::new();
1429 for list in lists {
1430 if list.entries.is_empty() {
1431 continue;
1432 }
1433 let mut lo = f64::INFINITY;
1442 let mut hi = f64::NEG_INFINITY;
1443 for (_, raw) in &list.entries {
1444 if *raw < lo {
1445 lo = *raw;
1446 }
1447 if *raw > hi {
1448 hi = *raw;
1449 }
1450 }
1451 let range = hi - lo;
1452 let mut seen_in_arm: std::collections::HashSet<String> =
1457 std::collections::HashSet::new();
1458 for (key, raw) in &list.entries {
1459 let root = session_root(&key.session_id).to_owned();
1460 if !seen_in_arm.insert(root.clone()) {
1461 continue;
1462 }
1463 let norm = if range > 0.0 { (raw - lo) / range } else { 0.0 };
1464 let contribution = list.weight * norm;
1465 let entry = merged
1466 .entry(root)
1467 .or_insert_with(|| (0.0, Vec::new(), key.clone()));
1468 entry.0 += contribution;
1469 entry.1.push(list.retriever.as_wire().to_owned());
1470 }
1471 }
1472 let mut hits = merged
1473 .into_values()
1474 .map(|(score, matched_via, key)| FusedHit {
1475 key,
1476 score,
1477 matched_via,
1478 })
1479 .collect::<Vec<_>>();
1480 hits.sort_by(|left, right| {
1481 right
1482 .score
1483 .partial_cmp(&left.score)
1484 .unwrap_or(std::cmp::Ordering::Equal)
1485 .then_with(|| left.key.cmp(&right.key))
1486 });
1487 hits
1488 }
1489
1490 const ANCHOR_MIN_TERM_CHARS: usize = 4;
1495
1496 pub fn hit_payload(text: &str, query: &str) -> String {
1501 let chars_len = text.chars().count();
1502 if chars_len <= HIT_SNIPPET_CHARS {
1503 return text.to_owned();
1504 }
1505 query_snippet(text, query)
1506 }
1507
1508 fn query_snippet(text: &str, query: &str) -> String {
1524 let lower_text = text.to_lowercase();
1525 let terms: Vec<String> = query
1526 .split_whitespace()
1527 .filter(|term| !term.is_empty())
1528 .map(str::to_lowercase)
1529 .collect();
1530 let any_informative = terms
1531 .iter()
1532 .any(|term| term.chars().count() >= ANCHOR_MIN_TERM_CHARS);
1533 let hit = terms
1534 .iter()
1535 .filter(|term| !any_informative || term.chars().count() >= ANCHOR_MIN_TERM_CHARS)
1536 .filter_map(|term| lower_text.find(term.as_str()))
1537 .min();
1538 let chars: Vec<char> = text.chars().collect();
1539 let center = hit
1543 .map(|byte| lower_text[..byte].chars().count())
1544 .unwrap_or(0);
1545 let half = HIT_SNIPPET_CHARS / 2;
1546 let start = center.saturating_sub(half);
1547 let end = (start + HIT_SNIPPET_CHARS).min(chars.len());
1548 let start = end.saturating_sub(HIT_SNIPPET_CHARS);
1549 let mut snippet = String::new();
1553 if start > 0 {
1554 snippet.push_str(&format!("[{start} chars before] "));
1555 }
1556 snippet.extend(&chars[start..end]);
1557 if end < chars.len() {
1558 snippet.push_str(&format!(
1559 " [+{} more chars; pond_get for full]",
1560 chars.len() - end
1561 ));
1562 }
1563 snippet
1564 }
1565
1566 struct Candidate {
1567 session_id: String,
1568 message_id: String,
1569 base_score: f64,
1570 }
1571
1572 struct ScoredHit {
1573 meta: MessageMeta,
1574 score: f64,
1575 }
1576
1577 impl ScoredHit {
1578 fn to_search_result(
1579 &self,
1580 query: &str,
1581 summaries: &HashMap<(String, String), Vec<PartSummary>>,
1582 ) -> Result<SearchResult, ErrorEnvelope> {
1583 let text = hit_payload(&self.meta.search_text, query);
1584 let role = match self.meta.role.as_str() {
1585 "system" => Role::System,
1586 "user" => Role::User,
1587 "assistant" => Role::Assistant,
1588 "tool" => Role::Tool,
1589 other => {
1590 return Err(map_error(crate::Error::internal(format!(
1591 "stored message has unknown role: {other}"
1592 ))));
1593 }
1594 };
1595 let parts_summary = if matches!(role, Role::User) {
1598 summaries
1599 .get(&(self.meta.session_id.clone(), self.meta.message_id.clone()))
1600 .cloned()
1601 .unwrap_or_default()
1602 } else {
1603 Vec::new()
1604 };
1605 Ok(SearchResult {
1606 message_id: self.meta.message_id.clone(),
1607 role,
1608 timestamp: self.meta.timestamp,
1609 text,
1610 score: normalize_score(self.score),
1611 parts_summary,
1612 })
1613 }
1614 }
1615
1616 fn normalize_score(score: f64) -> f64 {
1617 (score / SCORE_DENOMINATOR).clamp(0.0, 1.0)
1618 }
1619
1620 fn normalize_fts(hits: Vec<(MessageKey, f32)>) -> Vec<Candidate> {
1621 let max = hits.iter().map(|(_, score)| *score).fold(0.0_f32, f32::max);
1622 hits.into_iter()
1623 .map(|(key, score)| Candidate {
1624 session_id: key.session_id,
1625 message_id: key.message_id,
1626 base_score: if max > 0.0 {
1627 f64::from(score / max)
1628 } else {
1629 0.0
1630 },
1631 })
1632 .collect()
1633 }
1634
1635 fn normalize_vector(hits: Vec<(MessageKey, f32)>) -> Vec<Candidate> {
1636 let n = hits.len() as f64;
1637 hits.into_iter()
1638 .enumerate()
1639 .map(|(idx, (key, _))| Candidate {
1640 session_id: key.session_id,
1641 message_id: key.message_id,
1642 base_score: if n > 0.0 { 1.0 - (idx as f64 / n) } else { 0.0 },
1643 })
1644 .collect()
1645 }
1646
1647 fn embed_query(embedder: &dyn Embedder, query: &str) -> Result<Vec<f32>, ErrorEnvelope> {
1648 let prompt = format_query(query);
1649 let vectors =
1653 tokio::task::block_in_place(|| embedder.embed(&[prompt])).map_err(|error_value| {
1654 map_error(crate::Error::internal(format!(
1655 "failed to embed query: {error_value}"
1656 )))
1657 })?;
1658 vectors.into_iter().next().ok_or_else(|| {
1659 map_error(crate::Error::internal(
1660 "embedder returned no vector for query",
1661 ))
1662 })
1663 }
1664
1665 async fn build_sessions(
1666 store: &Store,
1667 scored: &[ScoredHit],
1668 query: &str,
1669 ) -> Result<Vec<SearchSession>, ErrorEnvelope> {
1670 use std::collections::BTreeMap;
1671
1672 struct Acc {
1673 project: String,
1674 source_agent: String,
1675 matched_count: usize,
1676 matches: Vec<SearchResult>,
1677 }
1678 let mut user_ids_by_session: BTreeMap<String, Vec<String>> = BTreeMap::new();
1682 for hit in scored {
1683 if hit.meta.role == "user" {
1684 user_ids_by_session
1685 .entry(hit.meta.session_id.clone())
1686 .or_default()
1687 .push(hit.meta.message_id.clone());
1688 }
1689 }
1690 let mut summaries: HashMap<(String, String), Vec<PartSummary>> = HashMap::new();
1691 for (session_id, message_ids) in &user_ids_by_session {
1692 for (key, parts) in store
1693 .summary_parts_for_messages(session_id, message_ids)
1694 .await
1695 .map_err(map_storage)?
1696 {
1697 summaries.insert(
1698 key,
1699 parts
1700 .iter()
1701 .filter_map(|part| PartSummary::for_kind(&part.kind))
1702 .collect(),
1703 );
1704 }
1705 }
1706
1707 let mut groups: BTreeMap<String, Acc> = BTreeMap::new();
1708 for hit in scored {
1709 let root = session_root(&hit.meta.session_id).to_owned();
1710 let entry = groups.entry(root).or_insert_with(|| Acc {
1711 project: hit.meta.project.clone(),
1712 source_agent: hit.meta.source_agent.clone(),
1713 matched_count: 0,
1714 matches: Vec::new(),
1715 });
1716 entry.matched_count += 1;
1717 entry.matches.push(hit.to_search_result(query, &summaries)?);
1718 }
1719
1720 let session_ids = groups.keys().cloned().collect::<Vec<_>>();
1721 let counts = store
1722 .session_message_counts(&session_ids)
1723 .await
1724 .map_err(map_storage)?;
1725
1726 let mut result = groups
1727 .into_iter()
1728 .map(|(session_id, mut acc)| {
1729 acc.matches.sort_by(|left, right| {
1730 right
1731 .score
1732 .partial_cmp(&left.score)
1733 .unwrap_or(std::cmp::Ordering::Equal)
1734 .then_with(|| left.message_id.cmp(&right.message_id))
1735 });
1736 acc.matches.truncate(MAX_MATCHES_PER_SESSION);
1737 SearchSession {
1738 session_messages_count: counts.get(&session_id).copied().unwrap_or_default(),
1739 session_id,
1740 project: acc.project,
1741 source_agent: acc.source_agent,
1742 matched_message_count: acc.matched_count,
1743 matches: acc.matches,
1744 }
1745 })
1746 .collect::<Vec<_>>();
1747 result.sort_by(|left, right| {
1748 let left_score = left
1749 .matches
1750 .first()
1751 .map(|hit| hit.score)
1752 .unwrap_or_default();
1753 let right_score = right
1754 .matches
1755 .first()
1756 .map(|hit| hit.score)
1757 .unwrap_or_default();
1758 right_score
1759 .partial_cmp(&left_score)
1760 .unwrap_or(std::cmp::Ordering::Equal)
1761 .then_with(|| left.session_id.cmp(&right.session_id))
1762 });
1763 Ok(result)
1764 }
1765
1766 fn page_sessions(
1767 sessions: Vec<SearchSession>,
1768 matched_total: usize,
1769 plan: &SearchPlan,
1770 ) -> Result<SearchResponse, ErrorEnvelope> {
1771 if plan.offset >= sessions.len() {
1772 return Ok(SearchResponse {
1773 sessions: Vec::new(),
1774 matched_total,
1775 has_more: false,
1776 next_cursor: None,
1777 });
1778 }
1779
1780 let mut emitted = Vec::new();
1781 let mut used_bytes = 0usize;
1782 for session in sessions.iter().skip(plan.offset) {
1783 if emitted.len() >= plan.limit {
1784 break;
1785 }
1786 let bytes = serde_json::to_string(session)
1787 .map_err(|error| {
1788 map_error(crate::Error::internal(format!(
1789 "failed to size search response session: {error}"
1790 )))
1791 })?
1792 .len();
1793 if !emitted.is_empty() && used_bytes.saturating_add(bytes) > SEARCH_BUDGET_BYTES {
1794 break;
1795 }
1796 used_bytes = used_bytes.saturating_add(bytes);
1797 emitted.push(session.clone());
1798 }
1799
1800 let next_offset = plan.offset + emitted.len();
1801 let has_more = next_offset < sessions.len();
1802 let next_cursor = has_more.then(|| {
1803 encode_search_cursor(&SearchCursor {
1804 query: plan.query.clone(),
1805 similar_to: plan.similar_to.clone(),
1806 filters: plan.filters.clone(),
1807 offset: next_offset,
1808 })
1809 });
1810
1811 Ok(SearchResponse {
1812 sessions: emitted,
1813 matched_total,
1814 has_more,
1815 next_cursor,
1816 })
1817 }
1818
1819 pub fn build_filter(filters: &SearchFilters) -> Result<Predicate, ErrorEnvelope> {
1823 let mut clauses = Vec::new();
1824
1825 match &filters.project {
1826 None => {}
1827 Some(ProjectFilter::Contains(value)) => {
1828 clauses.push(Predicate::LikeContains("project", value.clone()));
1829 }
1830 Some(ProjectFilter::Regex(pattern)) => {
1831 clauses.push(Predicate::Regex("project", pattern.clone()));
1832 }
1833 }
1834
1835 if let Some(session_id) = &filters.session_id {
1836 clauses.push(Predicate::Eq("session_id", session_id.clone().into()));
1837 }
1838 if let Some(source_agent) = &filters.source_agent {
1839 clauses.push(Predicate::Eq("source_agent", source_agent.clone().into()));
1840 }
1841 if let Some(role) = &filters.role {
1842 if !matches!(role.as_str(), "user" | "assistant" | "system" | "tool") {
1843 return Err(map_error(crate::Error::validation_field(
1844 format!(
1845 "filters.role must be one of: user, assistant, system, tool; got {role}"
1846 ),
1847 "filters.role",
1848 Some(serde_json::json!(role)),
1849 Some("one of: user, assistant, system, tool".to_owned()),
1850 )));
1851 }
1852 clauses.push(Predicate::Eq("role", role.clone().into()));
1853 }
1854 if let Some(from_date) = &filters.from_date {
1855 clauses.push(Predicate::Gte(
1856 "timestamp",
1857 ScalarValue::Raw(date_bound(from_date, "filters.from_date", false)?),
1858 ));
1859 }
1860 if let Some(to_date) = &filters.to_date {
1861 clauses.push(Predicate::Lte(
1862 "timestamp",
1863 ScalarValue::Raw(date_bound(to_date, "filters.to_date", true)?),
1864 ));
1865 }
1866
1867 Ok(Predicate::And(clauses))
1868 }
1869
1870 fn date_bound(date: &str, field: &str, end_of_day: bool) -> Result<String, ErrorEnvelope> {
1873 NaiveDate::parse_from_str(date, "%Y-%m-%d").map_err(|_| {
1874 map_error(crate::Error::validation_field(
1875 format!("{field} must be in YYYY-MM-DD format; got {date}"),
1876 field,
1877 Some(serde_json::json!(date)),
1878 Some("YYYY-MM-DD".to_owned()),
1879 ))
1880 })?;
1881 let time = if end_of_day { "23:59:59" } else { "00:00:00" };
1882 Ok(format!("timestamp '{date} {time}'"))
1883 }
1884
1885 fn empty_response() -> SearchResponse {
1886 SearchResponse {
1887 sessions: Vec::new(),
1888 matched_total: 0,
1889 has_more: false,
1890 next_cursor: None,
1891 }
1892 }
1893
1894 #[cfg(test)]
1895 mod fusion_helpers_tests {
1896 #![allow(clippy::expect_used, clippy::unwrap_used)]
1897
1898 use super::*;
1899
1900 #[test]
1901 fn session_root_strips_agent_suffix_for_claude_code_subagents() {
1902 assert_eq!(
1903 session_root("94a50f23-1234-5678-9abc-def012345678"),
1904 "94a50f23-1234-5678-9abc-def012345678",
1905 );
1906 assert_eq!(
1907 session_root("94a50f23-1234-5678-9abc-def012345678/agent-abc123"),
1908 "94a50f23-1234-5678-9abc-def012345678",
1909 );
1910 assert_eq!(session_root("root/a/b"), "root");
1912 }
1913
1914 #[test]
1915 fn fuse_arms_dedupes_intra_arm_by_session_root_and_credits_cross_arm() {
1916 let mk = |sid: &str, mid: &str| crate::sessions::MessageKey {
1917 session_id: sid.to_owned(),
1918 message_id: mid.to_owned(),
1919 };
1920 let fts = RankedList {
1926 retriever: RetrieverKind::Fts,
1927 entries: vec![
1928 (mk("session-A", "msg-1"), 10.0),
1929 (mk("session-A", "msg-2"), 9.0),
1930 (mk("session-B", "msg-3"), 6.0),
1931 (mk("session-A/agent-x", "msg-4"), 5.0),
1932 ],
1933 weight: 0.135,
1934 };
1935 let vec_arm = RankedList {
1936 retriever: RetrieverKind::Vector,
1937 entries: vec![
1938 (mk("session-B", "msg-7"), 0.9),
1939 (mk("session-A", "msg-9"), 0.6),
1940 ],
1941 weight: 1.0,
1942 };
1943 let merged = fuse_arms(&[fts, vec_arm]);
1944 assert_eq!(merged.len(), 2);
1946 assert_eq!(merged[0].key.session_id, "session-B");
1953 assert_eq!(merged[0].key.message_id, "msg-3");
1956 assert_eq!(merged[0].matched_via, vec!["fts", "vector"]);
1957 assert_eq!(merged[1].key.session_id, "session-A");
1958 assert_eq!(merged[1].key.message_id, "msg-1");
1959 assert_eq!(merged[1].matched_via, vec!["fts", "vector"]);
1960 }
1961
1962 #[test]
1963 fn fuse_arms_collapses_degenerate_tied_arm_to_zero_contribution() {
1964 let mk = |sid: &str, mid: &str| crate::sessions::MessageKey {
1970 session_id: sid.to_owned(),
1971 message_id: mid.to_owned(),
1972 };
1973 let fts = RankedList {
1974 retriever: RetrieverKind::Fts,
1975 entries: vec![(mk("session-A", "a"), 1.0), (mk("session-B", "b"), 1.0)],
1976 weight: 0.135,
1977 };
1978 let vec_arm = RankedList {
1979 retriever: RetrieverKind::Vector,
1980 entries: vec![(mk("session-A", "a"), 0.9), (mk("session-B", "b"), 0.3)],
1981 weight: 1.0,
1982 };
1983 let merged = fuse_arms(&[fts, vec_arm]);
1984 assert_eq!(merged[0].key.session_id, "session-A");
1986 assert!((merged[0].score - 1.0).abs() < 1e-9);
1987 assert!(merged[1].score.abs() < 1e-9);
1988 }
1989 }
1990}
1991
1992pub use search_handler::{
1993 FusedHit, RankedList, RetrieverKind, SearchMode, SearchPlan, build_filter, explain_search_plan,
1994 fuse_arms, hit_payload, plan_search, pond_search,
1995};
1996
1997#[cfg(test)]
1998mod tests {
1999 #![allow(clippy::expect_used, clippy::unwrap_used)]
2000
2001 use super::*;
2002 use crate::wire::{ProjectFilter, SearchFilters, SearchRequest};
2003 use chrono::Utc;
2004
2005 fn search_request(query: &str) -> SearchRequest {
2006 SearchRequest {
2007 protocol_version: crate::PROTOCOL_VERSION,
2008 namespace: Some("local".to_owned()),
2009 query: query.to_owned(),
2010 mode_override: None,
2011 similar_to: None,
2012 filters: SearchFilters::default(),
2013 limit: 20,
2014 cursor: None,
2015 }
2016 }
2017
2018 fn key(session: &str, id: &str) -> crate::sessions::MessageKey {
2019 crate::sessions::MessageKey {
2020 session_id: session.to_owned(),
2021 message_id: id.to_owned(),
2022 }
2023 }
2024
2025 #[test]
2026 fn fuse_arms_fuses_retrievers_and_reports_provenance() {
2027 let lists = [
2032 RankedList {
2033 retriever: RetrieverKind::Vector,
2034 entries: vec![
2035 (key("session-a", "a"), 0.9),
2036 (key("session-b", "b"), 0.7),
2037 (key("session-c", "c"), 0.5),
2038 ],
2039 weight: 1.0,
2040 },
2041 RankedList {
2042 retriever: RetrieverKind::Fts,
2043 entries: vec![
2044 (key("session-b", "b"), 10.0),
2045 (key("session-a", "a"), 8.0),
2046 (key("session-d", "d"), 4.0),
2047 ],
2048 weight: 0.135,
2049 },
2050 ];
2051 let merged = fuse_arms(&lists);
2052
2053 assert_eq!(merged[0].key.session_id, "session-a");
2060 assert_eq!(merged[1].key.session_id, "session-b");
2061 assert_eq!(merged[0].matched_via, vec!["vector", "fts"]);
2062 assert!(merged[0].score > merged[1].score);
2063
2064 let c = merged
2065 .iter()
2066 .find(|hit| hit.key.session_id == "session-c")
2067 .unwrap();
2068 assert_eq!(c.matched_via, vec!["vector"]);
2069 let d = merged
2070 .iter()
2071 .find(|hit| hit.key.session_id == "session-d")
2072 .unwrap();
2073 assert_eq!(d.matched_via, vec!["fts"]);
2074 }
2075
2076 #[test]
2077 fn hit_payload_returns_short_text_in_full() {
2078 let short = "a short message body";
2079 let text = hit_payload(short, "message");
2080 assert_eq!(text, short, "small text is returned as-is");
2081 }
2082
2083 #[test]
2084 fn hit_payload_windows_long_text_around_the_query_term() {
2085 let body = format!("{}NEEDLE{}", "a".repeat(2000), "b".repeat(394));
2087 let text = hit_payload(&body, "needle");
2088 assert!(
2089 text.contains("NEEDLE"),
2090 "text is the match-windowed snippet: {text}"
2091 );
2092 assert!(
2095 text.chars().count() <= 600 + 64,
2096 "snippet window is bounded by HIT_SNIPPET_CHARS plus markers: {}",
2097 text.chars().count()
2098 );
2099 }
2100
2101 #[test]
2102 fn hit_payload_snippet_survives_case_folding_that_changes_byte_length() {
2103 let body = format!("İÉÉÉ{}", "a".repeat(2100));
2107 let text = hit_payload(&body, "ééé");
2108 assert!(
2109 text.contains("ÉÉÉ"),
2110 "snippet windows on the matched term: {text}"
2111 );
2112 }
2113
2114 #[tokio::test]
2115 async fn restore_lineage_rejects_a_graph_nesting_deeper_than_one_level() {
2116 use crate::adapter::Extracted;
2117 use crate::sessions::Store;
2118 use crate::wire::{ProviderOptions, Session};
2119 use tempfile::TempDir;
2120
2121 let session = |id: &str, parent: Option<&str>| Session {
2122 id: id.to_owned(),
2123 parent_session_id: parent.map(str::to_owned),
2124 parent_message_id: None,
2125 source_agent: "claude-code".to_owned(),
2126 created_at: Utc::now(),
2127 project: Extracted::from_test_value("/tmp/pond".to_owned()),
2128 options: ProviderOptions::new(),
2129 };
2130
2131 let dir = TempDir::new().unwrap();
2132 let store = Store::open_local(dir.path()).await.unwrap();
2133 store
2135 .upsert_sessions(&[
2136 session("a", None),
2137 session("b", Some("a")),
2138 session("c", Some("b")),
2139 ])
2140 .await
2141 .unwrap();
2142
2143 let err = restore_lineage(&store, "a").await.unwrap_err();
2145 assert!(
2146 err.to_string().contains("one subagent level"),
2147 "expected the deeper-graph error, got: {err}"
2148 );
2149
2150 let lineage = restore_lineage(&store, "b").await.unwrap();
2152 let ids: Vec<&str> = lineage.iter().map(|s| s.session.id.as_str()).collect();
2153 assert_eq!(ids, ["b", "c"]);
2154 }
2155
2156 #[test]
2157 fn build_filter_pushes_down_each_predicate_and_handles_empty() {
2158 let filters = SearchFilters {
2159 project: Some(ProjectFilter::Contains("/Users/me/pond".to_owned())),
2160 session_id: Some("01HXY".to_owned()),
2161 source_agent: Some("claude-code".to_owned()),
2162 role: Some("assistant".to_owned()),
2163 from_date: Some("2026-01-01".to_owned()),
2164 to_date: Some("2026-05-01".to_owned()),
2165 min_score: 0.0,
2166 };
2167 let sql = build_filter(&filters).unwrap().to_lance();
2168 assert!(sql.contains("project LIKE '%/Users/me/pond%'"));
2169 assert!(sql.contains("session_id = '01HXY'"));
2170 assert!(sql.contains("source_agent = 'claude-code'"));
2171 assert!(sql.contains("role = 'assistant'"));
2172 assert!(sql.contains("timestamp >="));
2173 assert!(sql.contains("timestamp <="));
2174
2175 assert_eq!(
2177 build_filter(&SearchFilters::default()).unwrap().to_lance(),
2178 "",
2179 );
2180 }
2181
2182 #[test]
2183 fn build_filter_rejects_bad_role_and_date() {
2184 let bad_role = SearchFilters {
2185 role: Some("wizard".to_owned()),
2186 ..SearchFilters::default()
2187 };
2188 assert!(build_filter(&bad_role).is_err());
2189
2190 let bad_date = SearchFilters {
2191 from_date: Some("01-01-2026".to_owned()),
2192 ..SearchFilters::default()
2193 };
2194 assert!(build_filter(&bad_date).is_err());
2195 }
2196
2197 #[test]
2198 fn build_filter_contains_escapes_like_wildcards() {
2199 let filters = SearchFilters {
2200 project: Some(ProjectFilter::Contains("/Users/me/my_project".to_owned())),
2201 ..SearchFilters::default()
2202 };
2203 let sql = build_filter(&filters).unwrap().to_lance();
2204 assert!(
2207 sql.contains(r"my\_project"),
2208 "underscore must be escaped: {sql}"
2209 );
2210 assert!(
2211 sql.contains(r"ESCAPE '\'"),
2212 "predicate must declare the escape char: {sql}"
2213 );
2214 }
2215
2216 #[test]
2217 fn plan_search_shapes_request_for_each_planning_input() {
2218 let mut request = search_request(" vector memory ");
2219 request.limit = 500;
2220 request.filters.min_score = 0.42;
2221 let plan = plan_search(request, SearchMode::Hybrid).unwrap();
2222 assert_eq!(plan.mode, SearchMode::Hybrid);
2223 assert_eq!(plan.query, "vector memory");
2224 assert_eq!(plan.limit, 200);
2225 assert_eq!(plan.pool, 1000);
2226 assert_eq!(plan.vector_pool, 2000);
2227 assert_eq!(plan.min_score, 0.42);
2228
2229 let mut request = search_request("tiny pool");
2231 request.limit = 1;
2232 let plan = plan_search(request, SearchMode::Fts).unwrap();
2233 assert_eq!(plan.mode, SearchMode::Fts);
2234 assert_eq!(plan.limit, 1);
2235 assert_eq!(plan.pool, 50);
2236 assert_eq!(plan.vector_pool, 100);
2237
2238 let mut request = search_request("filtered");
2240 request.filters.project = Some(ProjectFilter::Contains("/Users/me/pond".to_owned()));
2241 request.filters.role = Some("assistant".to_owned());
2242 let plan = plan_search(request, SearchMode::Fts).unwrap();
2243 let sql = plan.filter.to_lance();
2244 assert!(sql.contains("project LIKE"));
2245 assert!(sql.contains("role = 'assistant'"));
2246 }
2247
2248 #[test]
2249 fn plan_search_rejects_invalid_composition_before_execution() {
2250 let mut blank = search_request(" ");
2251 let error = plan_search(blank.clone(), SearchMode::Fts)
2252 .unwrap_err()
2253 .error;
2254 assert_eq!(error.code, crate::wire::ErrorCode::ValidationFailed);
2255 assert_eq!(error.details["field"], "query");
2256
2257 blank.query = "valid".to_owned();
2258 blank.limit = 0;
2259 let error = plan_search(blank.clone(), SearchMode::Fts)
2260 .unwrap_err()
2261 .error;
2262 assert_eq!(error.details["field"], "limit");
2263
2264 blank.limit = 1;
2265 blank.namespace = Some("remote".to_owned());
2266 let error = plan_search(blank, SearchMode::Fts).unwrap_err().error;
2267 assert_eq!(error.code, crate::wire::ErrorCode::NamespaceUnknown);
2268 assert_eq!(error.details["namespace"], "remote");
2269 }
2270}