1fn map_error(error: crate::Error) -> crate::wire::ErrorEnvelope {
2 error.into()
3}
4
5#[derive(Debug, Clone)]
10pub struct NamespaceIdent(pub Vec<String>);
11
12impl NamespaceIdent {
13 pub fn root() -> Self {
14 Self(vec![])
15 }
16 pub fn as_table_id(&self, table_name: &str) -> Vec<String> {
17 let mut id = self.0.clone();
18 id.push(table_name.to_string());
19 id
20 }
21}
22
23pub fn resolve_namespace(
27 namespace: Option<&str>,
28) -> Result<NamespaceIdent, crate::wire::ErrorEnvelope> {
29 match namespace {
30 None | Some(crate::wire::DEFAULT_NAMESPACE) => Ok(NamespaceIdent::root()),
31 Some(other) => Err(map_error(crate::Error::namespace_unknown(other))),
32 }
33}
34
35fn map_storage(error: anyhow::Error) -> crate::wire::ErrorEnvelope {
36 if let Some(conflict) = error.downcast_ref::<crate::substrate::ConflictExhausted>() {
39 return map_error(crate::Error::Conflict {
40 attempts: conflict.attempts,
41 });
42 }
43 map_error(crate::Error::Storage(error))
44}
45
46mod ingest_handler {
47 use anyhow::Result;
48 use tokio_stream::StreamExt;
49
50 use crate::{
51 adapter::{Adapter, AdapterYield, SkipOracle, SkipReason},
52 sessions::{IngestEvent, IngestSummary, IngestValidator, OutcomeStatus, RowOutcome, Store},
53 wire::{
54 ErrorBody, ErrorCode, IngestEnvelope, IngestRequest, IngestResponse, IngestResult,
55 IngestStatus, validate_protocol,
56 },
57 };
58
59 use super::{map_error, map_storage};
60
61 pub const MAX_INGEST_EVENTS: usize = 1000;
63
64 #[derive(Debug, Clone)]
71 pub enum SyncEvent {
72 Discovered { total: Option<usize> },
76 SessionDone(SessionOutcome),
79 }
80
81 #[derive(Debug, Clone)]
83 pub struct SessionOutcome {
84 pub project: Option<String>,
86 pub session_id: Option<String>,
89 pub messages: usize,
92 pub status: SyncStatus,
93 }
94
95 #[derive(Debug, Clone)]
108 pub enum SyncStatus {
109 Ok,
110 Partial {
111 dropped_events: usize,
112 first_drop_reason: Option<String>,
115 },
116 Skipped {
117 reason: String,
118 },
119 Rejected {
120 reason: String,
121 },
122 Fresh,
125 Empty,
129 }
130
131 #[derive(Debug, Default)]
132 struct InFlight {
133 project: Option<String>,
134 session_id: String,
135 messages: usize,
136 dropped_events: usize,
141 first_drop_reason: Option<String>,
142 session_index: usize,
146 }
147
148 #[derive(Debug)]
153 struct PendingDone {
154 project: Option<String>,
155 session_id: String,
156 messages: usize,
157 dropped_events: usize,
158 first_drop_reason: Option<String>,
159 session_index: usize,
160 }
161
162 const ADAPTER_FLUSH_BATCH: usize = 100;
169
170 pub async fn ingest_adapter<F>(
181 store: &Store,
182 adapter: &dyn Adapter,
183 oracle: &dyn SkipOracle,
184 mut on_event: F,
185 ) -> Result<IngestSummary>
186 where
187 F: FnMut(SyncEvent),
188 {
189 let mut summary = IngestSummary::default();
190 let truncations_before = crate::adapter::extract::truncated_values_count();
191 let total = adapter
195 .discover()
196 .await
197 .map_err(|error| tracing::debug!(%error, "adapter discover failed"))
198 .ok();
199 on_event(SyncEvent::Discovered { total });
200
201 let mut events = adapter.events_with(oracle);
202 let mut validator = IngestValidator::default();
203 let mut index = 0usize;
207 let mut in_flight: Option<InFlight> = None;
208 let mut pending_dones: std::collections::VecDeque<PendingDone> =
212 std::collections::VecDeque::new();
213 let mut decode_total = std::time::Duration::ZERO;
218 let mut decode_count = 0u64;
219 let mut validator_total = std::time::Duration::ZERO;
220 let mut validator_count = 0u64;
221 let run_started = std::time::Instant::now();
222
223 loop {
224 let decode_start = std::time::Instant::now();
225 let next = events.next().await;
226 decode_total += decode_start.elapsed();
227 decode_count += 1;
228 let event = match next {
229 Some(event) => event,
230 None => break,
231 };
232 match event {
233 Ok(AdapterYield::Skipped {
234 session_id,
235 project,
236 reason,
237 }) => {
238 let status = match reason {
239 SkipReason::Fresh => {
240 summary.skipped_fresh += 1;
241 SyncStatus::Fresh
242 }
243 SkipReason::Empty => {
244 summary.skipped_empty += 1;
245 SyncStatus::Empty
246 }
247 };
248 on_event(SyncEvent::SessionDone(SessionOutcome {
249 project,
250 session_id,
251 messages: 0,
252 status,
253 }));
254 }
255 Ok(AdapterYield::Event(event)) => {
256 if matches!(&event, IngestEvent::Session(_))
261 && let Some(prev) = in_flight.take()
262 {
263 pending_dones.push_back(PendingDone {
264 project: prev.project,
265 session_id: prev.session_id,
266 messages: prev.messages,
267 dropped_events: prev.dropped_events,
268 first_drop_reason: prev.first_drop_reason,
269 session_index: prev.session_index,
270 });
271 }
272 let event_index = index;
273 match &event {
274 IngestEvent::Session(session) => {
275 in_flight = Some(InFlight {
276 project: Some((*session.project).clone()),
277 session_id: session.id.clone(),
278 messages: 0,
279 dropped_events: 0,
280 first_drop_reason: None,
281 session_index: event_index,
282 });
283 }
284 IngestEvent::Message(_) => {
285 if let Some(slot) = in_flight.as_mut() {
286 slot.messages += 1;
287 }
288 }
289 IngestEvent::Part(_) => {}
290 }
291
292 let validator_start = std::time::Instant::now();
293 let push_outcomes = validator.push(store, index, event).await?;
294 validator_total += validator_start.elapsed();
295 validator_count += 1;
296 for outcome in &push_outcomes {
303 if matches!(outcome.status, OutcomeStatus::Error)
304 && outcome.kind != "session"
305 && let Some(slot) = in_flight.as_mut()
306 {
307 slot.dropped_events += 1;
308 if slot.first_drop_reason.is_none() {
309 slot.first_drop_reason =
310 outcome.error.as_ref().map(|err| err.message.clone());
311 }
312 }
313 }
314 summary.add_outcomes(&push_outcomes);
315 index += 1;
316
317 if validator.pending_substreams() >= ADAPTER_FLUSH_BATCH {
322 let flush_start = std::time::Instant::now();
323 let flush_outcomes = validator.flush(store).await?;
324 validator_total += flush_start.elapsed();
325 validator_count += 1;
326 summary.add_outcomes(&flush_outcomes);
327 drain_pending_dones(&mut pending_dones, &flush_outcomes, &mut on_event);
328 }
329 }
330 Err(error) => {
331 tracing::debug!(
337 %error,
338 "adapter event error (per-line drop by design)"
339 );
340 match in_flight.as_mut() {
341 Some(slot) => {
342 slot.dropped_events += 1;
346 if slot.first_drop_reason.is_none() {
347 slot.first_drop_reason = Some(error.to_string());
348 }
349 summary.dropped_events += 1;
350 }
351 None => {
352 summary.skipped_files += 1;
357 on_event(SyncEvent::SessionDone(SessionOutcome {
358 project: None,
359 session_id: None,
360 messages: 0,
361 status: SyncStatus::Skipped {
362 reason: error.to_string(),
363 },
364 }));
365 }
366 }
367 }
368 }
369 }
370
371 if let Some(prev) = in_flight.take() {
372 pending_dones.push_back(PendingDone {
373 project: prev.project,
374 session_id: prev.session_id,
375 messages: prev.messages,
376 dropped_events: prev.dropped_events,
377 first_drop_reason: prev.first_drop_reason,
378 session_index: prev.session_index,
379 });
380 }
381 let validator_start = std::time::Instant::now();
382 let final_outcomes = validator.finish(store).await?;
383 validator_total += validator_start.elapsed();
384 validator_count += 1;
385 summary.add_outcomes(&final_outcomes);
386 drain_pending_dones(&mut pending_dones, &final_outcomes, &mut on_event);
387
388 summary.truncated_values = crate::adapter::extract::truncated_values_count()
389 .saturating_sub(truncations_before) as usize;
390
391 let total = run_started.elapsed();
392 let other = total
393 .saturating_sub(decode_total)
394 .saturating_sub(validator_total);
395 tracing::info!(
396 target: "pond::perf",
397 total_ms = total.as_millis() as u64,
398 decode_ms = decode_total.as_millis() as u64,
399 validator_ms = validator_total.as_millis() as u64,
400 other_ms = other.as_millis() as u64,
401 decode_calls = decode_count,
402 validator_calls = validator_count,
403 rows_inserted = summary.inserted as u64,
404 rows_matched = summary.matched as u64,
405 dropped_events = summary.dropped_events as u64,
406 dropped_sessions = summary.dropped_sessions as u64,
407 skipped_files = summary.skipped_files as u64,
408 skipped_fresh = summary.skipped_fresh as u64,
409 truncated_values = summary.truncated_values as u64,
410 "ingest_adapter complete"
411 );
412 Ok(summary)
413 }
414
415 fn drain_pending_dones<F>(
422 queue: &mut std::collections::VecDeque<PendingDone>,
423 outcomes: &[RowOutcome],
424 on_event: &mut F,
425 ) where
426 F: FnMut(SyncEvent),
427 {
428 let mut session_outcome_by_index: std::collections::HashMap<usize, &RowOutcome> =
431 std::collections::HashMap::new();
432 for outcome in outcomes {
433 if outcome.kind == "session" {
434 session_outcome_by_index.insert(outcome.index, outcome);
435 }
436 }
437
438 while let Some(done) = queue.pop_front() {
439 let session_outcome = session_outcome_by_index.get(&done.session_index).copied();
440 let rejection_reason = session_outcome.and_then(|outcome| {
441 if matches!(outcome.status, OutcomeStatus::Error) {
442 Some(
443 outcome
444 .error
445 .as_ref()
446 .map(|err| err.message.clone())
447 .unwrap_or_else(|| "session-level rejection".to_owned()),
448 )
449 } else {
450 None
451 }
452 });
453 let status = if let Some(reason) = rejection_reason {
454 SyncStatus::Rejected { reason }
455 } else if done.dropped_events > 0 {
456 SyncStatus::Partial {
457 dropped_events: done.dropped_events,
458 first_drop_reason: done.first_drop_reason,
459 }
460 } else {
461 SyncStatus::Ok
462 };
463 on_event(SyncEvent::SessionDone(SessionOutcome {
464 project: done.project,
465 session_id: Some(done.session_id),
466 messages: done.messages,
467 status,
468 }));
469 }
470 }
471
472 pub async fn pond_ingest(store: &Store, request: IngestRequest) -> IngestEnvelope {
478 if let Err(envelope) = validate_protocol(request.protocol_version) {
479 return IngestEnvelope::Error(envelope);
480 }
481 if let Err(envelope) = super::resolve_namespace(request.namespace.as_deref()) {
482 return IngestEnvelope::Error(envelope);
483 }
484 if request.events.is_empty() {
485 return IngestEnvelope::Error(map_error(crate::Error::validation_field(
486 "events must be a non-empty array",
487 "events",
488 Some(serde_json::json!([])),
489 Some("non-empty array".to_owned()),
490 )));
491 }
492 if request.events.len() > MAX_INGEST_EVENTS {
493 return IngestEnvelope::Error(map_error(crate::Error::validation_field(
494 format!("ingest batch exceeds the event cap: at most {MAX_INGEST_EVENTS} events"),
495 "events",
496 Some(serde_json::json!(request.events.len())),
497 Some(format!("at most {MAX_INGEST_EVENTS} events")),
498 )));
499 }
500
501 match ingest_events(store, request.events).await {
502 Ok(outcomes) => {
503 let mut accepted = 0;
504 let mut rejected = 0;
505 for outcome in &outcomes {
506 match outcome.status {
507 OutcomeStatus::Inserted | OutcomeStatus::Matched => accepted += 1,
508 OutcomeStatus::Error => rejected += 1,
509 }
510 }
511 let results = outcomes
512 .into_iter()
513 .map(outcome_to_result)
514 .collect::<Vec<_>>();
515 IngestEnvelope::Success(IngestResponse {
516 accepted,
517 rejected,
518 results,
519 })
520 }
521 Err(failure) => IngestEnvelope::Error(map_storage(failure)),
522 }
523 }
524
525 pub async fn ingest_events(store: &Store, events: Vec<IngestEvent>) -> Result<Vec<RowOutcome>> {
531 let mut validator = IngestValidator::default();
532 let mut outcomes = Vec::with_capacity(events.len());
533 for (index, event) in events.into_iter().enumerate() {
534 let mut chunk = validator.push(store, index, event).await?;
535 outcomes.append(&mut chunk);
536 }
537 let mut tail = validator.finish(store).await?;
538 outcomes.append(&mut tail);
539 outcomes.sort_by_key(|outcome| outcome.index);
540 Ok(outcomes)
541 }
542
543 fn outcome_to_result(outcome: RowOutcome) -> IngestResult {
544 let (status, error) = match (outcome.status, outcome.error) {
545 (OutcomeStatus::Inserted, _) => (IngestStatus::Inserted, None),
546 (OutcomeStatus::Matched, _) => (IngestStatus::Matched, None),
547 (OutcomeStatus::Error, error) => {
548 let body = error
549 .map(|err| {
550 let mut details = serde_json::Map::new();
551 if let Some(field) = err.field {
552 details.insert("field".to_owned(), serde_json::json!(field));
553 }
554 if let Some(reason) = err.reason {
555 details.insert("reason".to_owned(), serde_json::json!(reason));
556 }
557 ErrorBody {
558 code: ErrorCode::ValidationFailed,
559 message: err.message,
560 details: serde_json::Value::Object(details),
561 }
562 })
563 .unwrap_or_else(|| ErrorBody {
564 code: ErrorCode::ValidationFailed,
565 message: "ingest failed".to_owned(),
566 details: serde_json::json!({}),
567 });
568 (IngestStatus::Error, Some(body))
569 }
570 };
571 IngestResult {
572 index: outcome.index,
573 kind: outcome.kind.to_owned(),
574 pk: outcome.pk,
575 status,
576 error,
577 }
578 }
579}
580
581pub use crate::sessions::{IngestEvent, IngestSummary, IngestValidator, search_text};
582pub use ingest_handler::{
583 MAX_INGEST_EVENTS, SessionOutcome, SyncEvent, SyncStatus, ingest_adapter, ingest_events,
584 pond_ingest,
585};
586
587mod export_handler {
588 use anyhow::{Context, Result};
599 use tokio::io::{AsyncWrite, AsyncWriteExt};
600
601 use crate::sessions::{IngestEvent, Store};
602
603 #[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
604 pub struct ExportSummary {
605 pub sessions: usize,
606 pub messages: usize,
607 pub parts: usize,
608 }
609
610 pub async fn pond_export<W>(
611 store: &Store,
612 session_filter: Option<&str>,
613 writer: &mut W,
614 ) -> Result<ExportSummary>
615 where
616 W: AsyncWrite + Unpin,
617 {
618 let mut session_ids = match session_filter {
619 Some(id) => vec![id.to_owned()],
620 None => store.session_ids().await?,
621 };
622 session_ids.sort();
623
624 let mut summary = ExportSummary::default();
625 for session_id in session_ids {
626 let Some(stored) = store
627 .get_session(&session_id)
628 .await
629 .with_context(|| format!("export: failed to load session {session_id}"))?
630 else {
631 if session_filter.is_some() {
632 anyhow::bail!("export: session not found: {session_id}");
633 }
634 continue;
635 };
636 write_event(writer, &IngestEvent::Session(stored.session)).await?;
637 summary.sessions += 1;
638 for message_with_parts in stored.messages {
639 write_event(writer, &IngestEvent::Message(message_with_parts.message)).await?;
640 summary.messages += 1;
641 for part in message_with_parts.parts {
642 write_event(writer, &IngestEvent::Part(part)).await?;
643 summary.parts += 1;
644 }
645 }
646 }
647 writer.flush().await.context("export: flush failed")?;
648 Ok(summary)
649 }
650
651 async fn write_event<W>(writer: &mut W, event: &IngestEvent) -> Result<()>
652 where
653 W: AsyncWrite + Unpin,
654 {
655 let line = serde_json::to_string(event).context("export: serialize event")?;
656 writer
657 .write_all(line.as_bytes())
658 .await
659 .context("export: write event")?;
660 writer
661 .write_all(b"\n")
662 .await
663 .context("export: write newline")?;
664 Ok(())
665 }
666}
667
668pub use export_handler::{ExportSummary, pond_export};
669
670mod restore_handler {
671 use anyhow::{Context, Result, bail};
678
679 use crate::sessions::{SessionWithMessages, Store};
680
681 pub async fn restore_lineage(
682 store: &Store,
683 session_id: &str,
684 ) -> Result<Vec<SessionWithMessages>> {
685 let Some(parent) = store.get_session(session_id).await? else {
686 bail!("export: session not found: {session_id}");
687 };
688 let mut sessions = vec![parent];
689 for child in store.child_sessions(session_id).await? {
690 if !store.child_sessions(&child.id).await?.is_empty() {
691 bail!(
692 "adapter-lineage-complete-restore supports one subagent level; session {} has child sessions",
693 child.id
694 );
695 }
696 let child_id = child.id;
697 let stored = store
698 .get_session(&child_id)
699 .await?
700 .with_context(|| format!("export: child session disappeared: {child_id}"))?;
701 sessions.push(stored);
702 }
703 Ok(sessions)
704 }
705}
706
707pub use restore_handler::restore_lineage;
708
709mod get_handler {
710 use crate::{
711 sessions::{GetLookup, MessageViewParams, RetrievedMessage, SessionViewParams, Store},
712 wire::{
713 GetEnvelope, GetRequest, GetResponse, GetResult, GetSession, MessageView, PartSummary,
714 ResponseMode, ResponsePart, validate_protocol,
715 },
716 };
717
718 use super::{map_error, map_storage};
719
720 fn to_message_view(message: RetrievedMessage, verbatim: bool) -> MessageView {
725 if verbatim {
726 return MessageView {
727 id: message.id,
728 role: message.role,
729 timestamp: message.timestamp,
730 text: None,
731 content: None,
732 parts_summary: Vec::new(),
733 parts: Some(
734 message
735 .parts
736 .into_iter()
737 .map(ResponsePart::from_part)
738 .collect(),
739 ),
740 };
741 }
742 let parts_summary = message
743 .parts
744 .iter()
745 .filter_map(|part| PartSummary::for_kind(&part.kind))
746 .collect();
747 MessageView {
748 id: message.id,
749 role: message.role,
750 timestamp: message.timestamp,
751 text: message.text,
752 content: message.content,
753 parts_summary,
754 parts: None,
755 }
756 }
757
758 const BUDGET_BYTES: usize = 200_000;
763
764 pub async fn pond_get(store: &Store, request: GetRequest) -> GetEnvelope {
765 if let Err(error) = validate_protocol(request.protocol_version) {
766 return GetEnvelope::Error(error);
767 }
768 if let Err(envelope) = super::resolve_namespace(request.namespace.as_deref()) {
769 return GetEnvelope::Error(envelope);
770 }
771
772 let response = match (&request.session_id, &request.message_id) {
773 (Some(session_id), None) => session_result(store, session_id, &request).await,
774 (None, Some(message_id)) => message_result(store, message_id, &request).await,
775 (Some(_), Some(_)) => Err(map_error(crate::Error::validation_field(
776 "session_id and message_id are mutually exclusive",
777 "message_id",
778 request.message_id.clone().map(serde_json::Value::String),
779 Some("omit when session_id is present".to_owned()),
780 ))),
781 (None, None) => Err(map_error(crate::Error::validation(
782 "one of session_id or message_id is required",
783 ))),
784 };
785
786 match response {
787 Ok(response) => GetEnvelope::Success(response),
788 Err(error) => GetEnvelope::Error(error),
789 }
790 }
791
792 fn unknown_after_id(request: &GetRequest, anchor_of: &str) -> crate::wire::ErrorEnvelope {
795 map_error(crate::Error::validation_field(
796 "after_id not found (stale or mistyped continuation anchor)",
797 "after_id",
798 request.after_id.clone().map(serde_json::Value::String),
799 Some(format!("a {anchor_of} from a prior page of this read")),
800 ))
801 }
802
803 async fn session_result(
804 store: &Store,
805 session_id: &str,
806 request: &GetRequest,
807 ) -> Result<GetResponse, crate::wire::ErrorEnvelope> {
808 let params = SessionViewParams {
809 mode: request.response_mode,
810 after_id: request.after_id.as_deref(),
811 limit: request.limit,
812 budget_bytes: BUDGET_BYTES,
813 };
814 let view = match store
815 .session_view(session_id, params)
816 .await
817 .map_err(map_storage)?
818 {
819 GetLookup::NotFound => {
820 return Err(map_error(crate::Error::not_found(
821 "session",
822 serde_json::json!(session_id),
823 format!("session not found: {session_id}"),
824 )));
825 }
826 GetLookup::UnknownAfterId => return Err(unknown_after_id(request, "message id")),
827 GetLookup::Found(view) => view,
828 };
829 let verbatim = matches!(request.response_mode, ResponseMode::Verbatim);
830 Ok(GetResponse {
831 session: GetSession::from_session(&view.session),
832 result: GetResult::Session {
833 messages: view
834 .messages
835 .into_iter()
836 .map(|message| to_message_view(message, verbatim))
837 .collect(),
838 messages_remaining: view.messages_remaining,
839 },
840 })
841 }
842
843 async fn message_result(
844 store: &Store,
845 message_id: &str,
846 request: &GetRequest,
847 ) -> Result<GetResponse, crate::wire::ErrorEnvelope> {
848 let params = MessageViewParams {
849 context_depth: request.context_depth,
850 after_id: request.after_id.as_deref(),
851 limit: request.limit,
852 budget_bytes: BUDGET_BYTES,
853 };
854 let view = match store
855 .message_view(message_id, params)
856 .await
857 .map_err(map_storage)?
858 {
859 GetLookup::NotFound => {
860 return Err(map_error(crate::Error::not_found(
861 "message",
862 serde_json::json!(message_id),
863 format!("message not found: {message_id}"),
864 )));
865 }
866 GetLookup::UnknownAfterId => return Err(unknown_after_id(request, "part id")),
867 GetLookup::Found(view) => view,
868 };
869 let target = MessageView {
872 id: view.target.id,
873 role: view.target.role,
874 timestamp: view.target.timestamp,
875 text: None,
876 content: None,
877 parts_summary: Vec::new(),
878 parts: None,
879 };
880 Ok(GetResponse {
881 session: GetSession::from_session(&view.session),
882 result: GetResult::Message {
883 target,
884 target_parts: view
885 .target_parts
886 .into_iter()
887 .map(ResponsePart::from_part)
888 .collect(),
889 target_parts_remaining: view.target_parts_remaining,
890 siblings: view
891 .siblings
892 .into_iter()
893 .map(|sibling| to_message_view(sibling, false))
894 .collect(),
895 },
896 })
897 }
898}
899
900pub use get_handler::pond_get;
901
902mod search_handler {
903 use base64::{Engine, engine::general_purpose::URL_SAFE_NO_PAD};
908
909 use crate::{
910 Clock, SystemClock,
911 embed::{Embedder, LazyEmbedder, format_query},
912 sessions::{MessageKey, MessageMeta, Store},
913 substrate::{Predicate, ScalarValue},
914 wire::{
915 ErrorEnvelope, PartSummary, ProjectFilter, Role, SearchCursor, SearchEnvelope,
916 SearchFilters, SearchRequest, SearchResponse, SearchResult, SearchSession,
917 validate_protocol,
918 },
919 };
920 use chrono::NaiveDate;
921 use std::collections::HashMap;
922
923 use super::{map_error, map_storage};
924
925 #[derive(Debug, Clone, Copy, PartialEq, Eq)]
930 pub enum SearchMode {
931 Hybrid,
932 Fts,
933 Vector,
934 }
935
936 #[derive(Debug, Clone, PartialEq)]
937 pub struct SearchPlan {
938 pub mode: SearchMode,
939 pub query: String,
940 pub similar_to: Option<String>,
944 pub filter: Predicate,
945 pub filters: SearchFilters,
946 pub pool: usize,
947 pub vector_pool: usize,
948 pub limit: usize,
949 pub offset: usize,
950 pub min_score: f64,
951 }
952
953 const LIMIT_CAP: usize = 200;
954 const MAX_MATCHES_PER_SESSION: usize = 3;
955 const SEARCH_BUDGET_BYTES: usize = 60_000;
956 const HIT_SNIPPET_CHARS: usize = 600;
960 const SCORE_DENOMINATOR: f64 = FTS_FUSION_WEIGHT + VECTOR_FUSION_WEIGHT;
961
962 const FTS_FUSION_WEIGHT: f64 = 0.135;
971 const VECTOR_FUSION_WEIGHT: f64 = 1.0;
972
973 fn encode_search_cursor(cursor: &SearchCursor) -> String {
974 #[allow(clippy::expect_used)]
975 let bytes = serde_json::to_vec(cursor).expect("search cursor encodes as JSON");
976 URL_SAFE_NO_PAD.encode(bytes)
977 }
978
979 fn decode_search_cursor(raw: &str) -> Result<SearchCursor, ErrorEnvelope> {
980 let bytes = URL_SAFE_NO_PAD.decode(raw).map_err(|_| {
981 map_error(crate::Error::validation_field(
982 "cursor is malformed (expected opaque value from a prior response)",
983 "cursor",
984 Some(serde_json::json!(raw)),
985 Some("opaque base64url".to_owned()),
986 ))
987 })?;
988 serde_json::from_slice(&bytes).map_err(|_| {
989 map_error(crate::Error::validation_field(
990 "cursor is malformed (decode failed)",
991 "cursor",
992 Some(serde_json::json!(raw)),
993 Some("opaque cursor from a prior response".to_owned()),
994 ))
995 })
996 }
997
998 pub async fn pond_search(
1007 store: &Store,
1008 embedder: &LazyEmbedder,
1009 request: SearchRequest,
1010 search: &crate::config::SearchConfig,
1011 ) -> SearchEnvelope {
1012 match run_search(store, embedder, request, search, &SystemClock).await {
1013 Ok(response) => SearchEnvelope::Success(response),
1014 Err(envelope) => SearchEnvelope::Error(envelope),
1015 }
1016 }
1017
1018 pub async fn explain_search_plan(
1019 store: &Store,
1020 embedder: &LazyEmbedder,
1021 request: SearchRequest,
1022 search: &crate::config::SearchConfig,
1023 ) -> Result<String, ErrorEnvelope> {
1024 let override_mode = request.mode_override.map(wire_mode_to_internal);
1025 let mut plan = plan_search(request, SearchMode::Fts)?;
1026 plan.mode = resolve_effective_mode(store, override_mode).await?;
1027 let mut out = String::new();
1028 if matches!(plan.mode, SearchMode::Fts | SearchMode::Hybrid) {
1029 let fts = store
1030 .explain_fts_plan(&plan.query, plan.pool, &plan.filter)
1031 .await
1032 .map_err(map_storage)?;
1033 out.push_str("fts:\n");
1034 out.push_str(&fts);
1035 out.push('\n');
1036 }
1037 if matches!(plan.mode, SearchMode::Vector | SearchMode::Hybrid) {
1038 let backend = load_embedder(embedder).await?;
1039 let vector = embed_query(backend.as_ref(), &plan.query)?;
1040 let vector_plan = store
1041 .explain_vector_plan(&vector, plan.vector_pool, &plan.filter, Some(search))
1042 .await
1043 .map_err(map_storage)?;
1044 out.push_str("vector:\n");
1045 out.push_str(&vector_plan);
1046 out.push('\n');
1047 }
1048 Ok(out)
1049 }
1050
1051 async fn run_search(
1052 store: &Store,
1053 embedder: &LazyEmbedder,
1054 request: SearchRequest,
1055 search: &crate::config::SearchConfig,
1056 _clock: &dyn Clock,
1057 ) -> Result<SearchResponse, ErrorEnvelope> {
1058 let override_mode = request.mode_override.map(wire_mode_to_internal);
1059 let mut plan = plan_search(request, SearchMode::Fts)?;
1060
1061 if plan.similar_to.is_some() {
1065 plan.mode = SearchMode::Vector;
1066 } else {
1067 plan.mode = resolve_effective_mode(store, override_mode).await?;
1071 }
1072
1073 let candidates = match plan.mode {
1074 SearchMode::Fts => {
1075 let hits = store
1076 .fts_search(&plan.query, plan.pool, &plan.filter)
1077 .await
1078 .map_err(map_storage)?;
1079 normalize_fts(hits)
1080 }
1081 SearchMode::Hybrid => {
1082 let backend = load_embedder(embedder).await?;
1083 let vector = embed_query(backend.as_ref(), &plan.query)?;
1084 let fts_fut = async {
1087 store
1088 .fts_search(&plan.query, plan.pool, &plan.filter)
1089 .await
1090 .map_err(map_storage)
1091 };
1092 let vector_fut = async {
1093 store
1094 .vector_search(&vector, plan.vector_pool, &plan.filter, Some(search))
1095 .await
1096 .map_err(map_storage)
1097 };
1098 let (fts, vector_raw) = tokio::try_join!(fts_fut, vector_fut)?;
1099 let fts_max = fts.iter().map(|(_, s)| *s).fold(0.0_f32, f32::max);
1111 let fts_entries: Vec<(MessageKey, f64)> = fts
1112 .into_iter()
1113 .map(|(key, score)| {
1114 let normed = if fts_max > 0.0 {
1115 f64::from(score / fts_max)
1116 } else {
1117 0.0
1118 };
1119 (key, normed)
1120 })
1121 .collect();
1122 let vec_n = vector_raw.len() as f64;
1123 let vector_entries: Vec<(MessageKey, f64)> = vector_raw
1124 .into_iter()
1125 .enumerate()
1126 .map(|(idx, (key, _))| {
1127 let normed = if vec_n > 0.0 {
1128 1.0 - (idx as f64 / vec_n)
1129 } else {
1130 0.0
1131 };
1132 (key, normed)
1133 })
1134 .collect();
1135 let lists = [
1140 RankedList {
1141 retriever: RetrieverKind::Fts,
1142 entries: fts_entries,
1143 weight: FTS_FUSION_WEIGHT,
1144 },
1145 RankedList {
1146 retriever: RetrieverKind::Vector,
1147 entries: vector_entries,
1148 weight: VECTOR_FUSION_WEIGHT,
1149 },
1150 ];
1151 fuse_arms(&lists)
1152 .into_iter()
1153 .map(|hit| Candidate {
1154 session_id: hit.key.session_id,
1155 message_id: hit.key.message_id,
1156 base_score: hit.score,
1157 })
1158 .collect()
1159 }
1160 SearchMode::Vector => {
1167 let vector = if let Some(similar_id) = &plan.similar_to {
1168 let stored = store
1169 .message_vector_by_id(similar_id)
1170 .await
1171 .map_err(map_storage)?;
1172 let Some(vector) = stored else {
1173 return Err(map_error(crate::Error::not_found(
1174 "message",
1175 serde_json::json!(similar_id),
1176 format!(
1177 "no embedded message with id {similar_id} (the message may not \
1178 exist, or it exists but is not yet embedded - run `pond embed`)"
1179 ),
1180 )));
1181 };
1182 vector
1183 } else {
1184 let backend = load_embedder(embedder).await?;
1185 embed_query(backend.as_ref(), &plan.query)?
1186 };
1187 let vector_raw = store
1188 .vector_search(&vector, plan.vector_pool, &plan.filter, Some(search))
1189 .await
1190 .map_err(map_storage)?;
1191 normalize_vector(vector_raw)
1192 }
1193 };
1194
1195 if candidates.is_empty() {
1196 return Ok(empty_response());
1197 }
1198
1199 let keys = candidates
1202 .iter()
1203 .map(|candidate| MessageKey {
1204 session_id: candidate.session_id.clone(),
1205 message_id: candidate.message_id.clone(),
1206 })
1207 .collect::<Vec<_>>();
1208 let metas = store
1209 .message_metas_by_keys(&keys)
1210 .await
1211 .map_err(map_storage)?;
1212 let meta_index = metas
1213 .iter()
1214 .map(|meta| ((meta.session_id.as_str(), meta.message_id.as_str()), meta))
1215 .collect::<std::collections::HashMap<_, _>>();
1216
1217 let mut scored = Vec::with_capacity(candidates.len());
1218 for candidate in candidates {
1219 let Some(meta) =
1220 meta_index.get(&(candidate.session_id.as_str(), candidate.message_id.as_str()))
1221 else {
1222 continue;
1223 };
1224 let score = candidate.base_score;
1225 if score < plan.min_score {
1226 continue;
1227 }
1228 scored.push(ScoredHit {
1229 meta: (*meta).clone(),
1230 score,
1231 });
1232 }
1233 scored.sort_by(|left, right| {
1234 right
1235 .score
1236 .partial_cmp(&left.score)
1237 .unwrap_or(std::cmp::Ordering::Equal)
1238 .then_with(|| left.meta.session_id.cmp(&right.meta.session_id))
1239 .then_with(|| left.meta.message_id.cmp(&right.meta.message_id))
1240 });
1241
1242 let matched_total = scored.len();
1243 let sessions = build_sessions(store, &scored, &plan.query).await?;
1244 page_sessions(sessions, matched_total, &plan)
1245 }
1246
1247 async fn resolve_effective_mode(
1252 store: &Store,
1253 override_mode: Option<SearchMode>,
1254 ) -> Result<SearchMode, ErrorEnvelope> {
1255 if let Some(mode) = override_mode {
1256 return Ok(mode);
1257 }
1258 let has = store.has_embeddings().await.map_err(map_storage)?;
1259 Ok(if has {
1260 SearchMode::Hybrid
1261 } else {
1262 SearchMode::Fts
1263 })
1264 }
1265
1266 async fn load_embedder(
1270 embedder: &LazyEmbedder,
1271 ) -> Result<std::sync::Arc<dyn Embedder>, ErrorEnvelope> {
1272 embedder.get().await.map_err(|error| {
1273 map_error(crate::Error::internal(format!(
1274 "embedder load failed: {error}"
1275 )))
1276 })
1277 }
1278
1279 pub fn plan_search(
1280 request: SearchRequest,
1281 mode: SearchMode,
1282 ) -> Result<SearchPlan, ErrorEnvelope> {
1283 validate_protocol(request.protocol_version)?;
1284
1285 let _ns = super::resolve_namespace(request.namespace.as_deref())?;
1286
1287 let cursor = match request.cursor.as_deref() {
1288 Some(raw) => Some(decode_search_cursor(raw)?),
1289 None => None,
1290 };
1291 let (query_raw, similar_raw, filters, offset) = match cursor {
1292 Some(cursor) => (
1293 cursor.query,
1294 cursor.similar_to,
1295 cursor.filters,
1296 cursor.offset,
1297 ),
1298 None => (request.query, request.similar_to, request.filters, 0),
1299 };
1300 let query = query_raw.trim().to_owned();
1301 let similar_to = similar_raw
1302 .as_ref()
1303 .map(|id| id.trim().to_owned())
1304 .filter(|id| !id.is_empty());
1305 if similar_to.is_none() && query.is_empty() {
1306 return Err(map_error(crate::Error::validation_field(
1307 "query must be non-empty after trim",
1308 "query",
1309 Some(serde_json::json!(query_raw)),
1310 Some("non-empty string after trim, or pass `similar_to`".to_owned()),
1311 )));
1312 }
1313 if request.limit == 0 {
1314 return Err(map_error(crate::Error::validation_field(
1315 "limit must be at least 1",
1316 "limit",
1317 Some(serde_json::json!(request.limit)),
1318 Some("integer >= 1".to_owned()),
1319 )));
1320 }
1321 let limit = request.limit.min(LIMIT_CAP);
1322 let min_score = filters.min_score;
1323 let filter = build_filter(&filters)?;
1324 let pool = limit.saturating_mul(5).max(50);
1327 Ok(SearchPlan {
1328 mode,
1329 query,
1330 similar_to,
1331 filter,
1332 filters,
1333 pool,
1334 vector_pool: pool.saturating_mul(2),
1335 limit,
1336 offset,
1337 min_score,
1338 })
1339 }
1340
1341 #[derive(Debug, Clone, Copy, PartialEq, Eq)]
1342 pub enum RetrieverKind {
1343 Vector,
1344 Fts,
1345 }
1346
1347 impl RetrieverKind {
1348 fn as_wire(self) -> &'static str {
1349 match self {
1350 Self::Vector => "vector",
1351 Self::Fts => "fts",
1352 }
1353 }
1354 }
1355
1356 pub struct RankedList {
1363 pub retriever: RetrieverKind,
1364 pub entries: Vec<(MessageKey, f64)>,
1365 pub weight: f64,
1366 }
1367
1368 fn wire_mode_to_internal(wire: crate::wire::SearchModeWire) -> SearchMode {
1371 match wire {
1372 crate::wire::SearchModeWire::Fts => SearchMode::Fts,
1373 crate::wire::SearchModeWire::Vector => SearchMode::Vector,
1374 crate::wire::SearchModeWire::Hybrid => SearchMode::Hybrid,
1375 }
1376 }
1377
1378 #[derive(Debug, Clone, PartialEq)]
1380 pub struct FusedHit {
1381 pub key: MessageKey,
1382 pub score: f64,
1383 pub matched_via: Vec<String>,
1384 }
1385
1386 fn session_root(session_id: &str) -> &str {
1391 match session_id.find('/') {
1392 Some(idx) => &session_id[..idx],
1393 None => session_id,
1394 }
1395 }
1396
1397 pub fn fuse_arms(lists: &[RankedList]) -> Vec<FusedHit> {
1420 let mut merged: std::collections::HashMap<String, (f64, Vec<String>, MessageKey)> =
1421 std::collections::HashMap::new();
1422 for list in lists {
1423 if list.entries.is_empty() {
1424 continue;
1425 }
1426 let mut lo = f64::INFINITY;
1435 let mut hi = f64::NEG_INFINITY;
1436 for (_, raw) in &list.entries {
1437 if *raw < lo {
1438 lo = *raw;
1439 }
1440 if *raw > hi {
1441 hi = *raw;
1442 }
1443 }
1444 let range = hi - lo;
1445 let mut seen_in_arm: std::collections::HashSet<String> =
1450 std::collections::HashSet::new();
1451 for (key, raw) in &list.entries {
1452 let root = session_root(&key.session_id).to_owned();
1453 if !seen_in_arm.insert(root.clone()) {
1454 continue;
1455 }
1456 let norm = if range > 0.0 { (raw - lo) / range } else { 0.0 };
1457 let contribution = list.weight * norm;
1458 let entry = merged
1459 .entry(root)
1460 .or_insert_with(|| (0.0, Vec::new(), key.clone()));
1461 entry.0 += contribution;
1462 entry.1.push(list.retriever.as_wire().to_owned());
1463 }
1464 }
1465 let mut hits = merged
1466 .into_values()
1467 .map(|(score, matched_via, key)| FusedHit {
1468 key,
1469 score,
1470 matched_via,
1471 })
1472 .collect::<Vec<_>>();
1473 hits.sort_by(|left, right| {
1474 right
1475 .score
1476 .partial_cmp(&left.score)
1477 .unwrap_or(std::cmp::Ordering::Equal)
1478 .then_with(|| left.key.cmp(&right.key))
1479 });
1480 hits
1481 }
1482
1483 const ANCHOR_MIN_TERM_CHARS: usize = 4;
1488
1489 pub fn hit_payload(text: &str, query: &str) -> String {
1494 let chars_len = text.chars().count();
1495 if chars_len <= HIT_SNIPPET_CHARS {
1496 return text.to_owned();
1497 }
1498 query_snippet(text, query)
1499 }
1500
1501 fn query_snippet(text: &str, query: &str) -> String {
1517 let lower_text = text.to_lowercase();
1518 let terms: Vec<String> = query
1519 .split_whitespace()
1520 .filter(|term| !term.is_empty())
1521 .map(str::to_lowercase)
1522 .collect();
1523 let any_informative = terms
1524 .iter()
1525 .any(|term| term.chars().count() >= ANCHOR_MIN_TERM_CHARS);
1526 let hit = terms
1527 .iter()
1528 .filter(|term| !any_informative || term.chars().count() >= ANCHOR_MIN_TERM_CHARS)
1529 .filter_map(|term| lower_text.find(term.as_str()))
1530 .min();
1531 let chars: Vec<char> = text.chars().collect();
1532 let center = hit
1536 .map(|byte| lower_text[..byte].chars().count())
1537 .unwrap_or(0);
1538 let half = HIT_SNIPPET_CHARS / 2;
1539 let start = center.saturating_sub(half);
1540 let end = (start + HIT_SNIPPET_CHARS).min(chars.len());
1541 let start = end.saturating_sub(HIT_SNIPPET_CHARS);
1542 let mut snippet = String::new();
1546 if start > 0 {
1547 snippet.push_str(&format!("[{start} chars before] "));
1548 }
1549 snippet.extend(&chars[start..end]);
1550 if end < chars.len() {
1551 snippet.push_str(&format!(
1552 " [+{} more chars; pond_get for full]",
1553 chars.len() - end
1554 ));
1555 }
1556 snippet
1557 }
1558
1559 struct Candidate {
1560 session_id: String,
1561 message_id: String,
1562 base_score: f64,
1563 }
1564
1565 struct ScoredHit {
1566 meta: MessageMeta,
1567 score: f64,
1568 }
1569
1570 impl ScoredHit {
1571 fn to_search_result(
1572 &self,
1573 query: &str,
1574 summaries: &HashMap<(String, String), Vec<PartSummary>>,
1575 ) -> Result<SearchResult, ErrorEnvelope> {
1576 let text = hit_payload(&self.meta.search_text, query);
1577 let role = match self.meta.role.as_str() {
1578 "system" => Role::System,
1579 "user" => Role::User,
1580 "assistant" => Role::Assistant,
1581 "tool" => Role::Tool,
1582 other => {
1583 return Err(map_error(crate::Error::internal(format!(
1584 "stored message has unknown role: {other}"
1585 ))));
1586 }
1587 };
1588 let parts_summary = if matches!(role, Role::User) {
1591 summaries
1592 .get(&(self.meta.session_id.clone(), self.meta.message_id.clone()))
1593 .cloned()
1594 .unwrap_or_default()
1595 } else {
1596 Vec::new()
1597 };
1598 Ok(SearchResult {
1599 message_id: self.meta.message_id.clone(),
1600 role,
1601 timestamp: self.meta.timestamp,
1602 text,
1603 score: normalize_score(self.score),
1604 parts_summary,
1605 })
1606 }
1607 }
1608
1609 fn normalize_score(score: f64) -> f64 {
1610 (score / SCORE_DENOMINATOR).clamp(0.0, 1.0)
1611 }
1612
1613 fn normalize_fts(hits: Vec<(MessageKey, f32)>) -> Vec<Candidate> {
1614 let max = hits.iter().map(|(_, score)| *score).fold(0.0_f32, f32::max);
1615 hits.into_iter()
1616 .map(|(key, score)| Candidate {
1617 session_id: key.session_id,
1618 message_id: key.message_id,
1619 base_score: if max > 0.0 {
1620 f64::from(score / max)
1621 } else {
1622 0.0
1623 },
1624 })
1625 .collect()
1626 }
1627
1628 fn normalize_vector(hits: Vec<(MessageKey, f32)>) -> Vec<Candidate> {
1629 let n = hits.len() as f64;
1630 hits.into_iter()
1631 .enumerate()
1632 .map(|(idx, (key, _))| Candidate {
1633 session_id: key.session_id,
1634 message_id: key.message_id,
1635 base_score: if n > 0.0 { 1.0 - (idx as f64 / n) } else { 0.0 },
1636 })
1637 .collect()
1638 }
1639
1640 fn embed_query(embedder: &dyn Embedder, query: &str) -> Result<Vec<f32>, ErrorEnvelope> {
1641 let prompt = format_query(query);
1642 let vectors =
1646 tokio::task::block_in_place(|| embedder.embed(&[prompt])).map_err(|error_value| {
1647 map_error(crate::Error::internal(format!(
1648 "failed to embed query: {error_value}"
1649 )))
1650 })?;
1651 vectors.into_iter().next().ok_or_else(|| {
1652 map_error(crate::Error::internal(
1653 "embedder returned no vector for query",
1654 ))
1655 })
1656 }
1657
1658 async fn build_sessions(
1659 store: &Store,
1660 scored: &[ScoredHit],
1661 query: &str,
1662 ) -> Result<Vec<SearchSession>, ErrorEnvelope> {
1663 use std::collections::BTreeMap;
1664
1665 struct Acc {
1666 project: String,
1667 source_agent: String,
1668 matched_count: usize,
1669 matches: Vec<SearchResult>,
1670 }
1671 let mut user_ids_by_session: BTreeMap<String, Vec<String>> = BTreeMap::new();
1675 for hit in scored {
1676 if hit.meta.role == "user" {
1677 user_ids_by_session
1678 .entry(hit.meta.session_id.clone())
1679 .or_default()
1680 .push(hit.meta.message_id.clone());
1681 }
1682 }
1683 let mut summaries: HashMap<(String, String), Vec<PartSummary>> = HashMap::new();
1684 for (session_id, message_ids) in &user_ids_by_session {
1685 for (key, parts) in store
1686 .summary_parts_for_messages(session_id, message_ids)
1687 .await
1688 .map_err(map_storage)?
1689 {
1690 summaries.insert(
1691 key,
1692 parts
1693 .iter()
1694 .filter_map(|part| PartSummary::for_kind(&part.kind))
1695 .collect(),
1696 );
1697 }
1698 }
1699
1700 let mut groups: BTreeMap<String, Acc> = BTreeMap::new();
1701 for hit in scored {
1702 let root = session_root(&hit.meta.session_id).to_owned();
1703 let entry = groups.entry(root).or_insert_with(|| Acc {
1704 project: hit.meta.project.clone(),
1705 source_agent: hit.meta.source_agent.clone(),
1706 matched_count: 0,
1707 matches: Vec::new(),
1708 });
1709 entry.matched_count += 1;
1710 entry.matches.push(hit.to_search_result(query, &summaries)?);
1711 }
1712
1713 let session_ids = groups.keys().cloned().collect::<Vec<_>>();
1714 let counts = store
1715 .session_message_counts(&session_ids)
1716 .await
1717 .map_err(map_storage)?;
1718
1719 let mut result = groups
1720 .into_iter()
1721 .map(|(session_id, mut acc)| {
1722 acc.matches.sort_by(|left, right| {
1723 right
1724 .score
1725 .partial_cmp(&left.score)
1726 .unwrap_or(std::cmp::Ordering::Equal)
1727 .then_with(|| left.message_id.cmp(&right.message_id))
1728 });
1729 acc.matches.truncate(MAX_MATCHES_PER_SESSION);
1730 SearchSession {
1731 session_messages_count: counts.get(&session_id).copied().unwrap_or_default(),
1732 session_id,
1733 project: acc.project,
1734 source_agent: acc.source_agent,
1735 matched_message_count: acc.matched_count,
1736 matches: acc.matches,
1737 }
1738 })
1739 .collect::<Vec<_>>();
1740 result.sort_by(|left, right| {
1741 let left_score = left
1742 .matches
1743 .first()
1744 .map(|hit| hit.score)
1745 .unwrap_or_default();
1746 let right_score = right
1747 .matches
1748 .first()
1749 .map(|hit| hit.score)
1750 .unwrap_or_default();
1751 right_score
1752 .partial_cmp(&left_score)
1753 .unwrap_or(std::cmp::Ordering::Equal)
1754 .then_with(|| left.session_id.cmp(&right.session_id))
1755 });
1756 Ok(result)
1757 }
1758
1759 fn page_sessions(
1760 sessions: Vec<SearchSession>,
1761 matched_total: usize,
1762 plan: &SearchPlan,
1763 ) -> Result<SearchResponse, ErrorEnvelope> {
1764 if plan.offset >= sessions.len() {
1765 return Ok(SearchResponse {
1766 sessions: Vec::new(),
1767 matched_total,
1768 has_more: false,
1769 next_cursor: None,
1770 });
1771 }
1772
1773 let mut emitted = Vec::new();
1774 let mut used_bytes = 0usize;
1775 for session in sessions.iter().skip(plan.offset) {
1776 if emitted.len() >= plan.limit {
1777 break;
1778 }
1779 let bytes = serde_json::to_string(session)
1780 .map_err(|error| {
1781 map_error(crate::Error::internal(format!(
1782 "failed to size search response session: {error}"
1783 )))
1784 })?
1785 .len();
1786 if !emitted.is_empty() && used_bytes.saturating_add(bytes) > SEARCH_BUDGET_BYTES {
1787 break;
1788 }
1789 used_bytes = used_bytes.saturating_add(bytes);
1790 emitted.push(session.clone());
1791 }
1792
1793 let next_offset = plan.offset + emitted.len();
1794 let has_more = next_offset < sessions.len();
1795 let next_cursor = has_more.then(|| {
1796 encode_search_cursor(&SearchCursor {
1797 query: plan.query.clone(),
1798 similar_to: plan.similar_to.clone(),
1799 filters: plan.filters.clone(),
1800 offset: next_offset,
1801 })
1802 });
1803
1804 Ok(SearchResponse {
1805 sessions: emitted,
1806 matched_total,
1807 has_more,
1808 next_cursor,
1809 })
1810 }
1811
1812 pub fn build_filter(filters: &SearchFilters) -> Result<Predicate, ErrorEnvelope> {
1816 let mut clauses = Vec::new();
1817
1818 match &filters.project {
1819 None => {}
1820 Some(ProjectFilter::Contains(value)) => {
1821 clauses.push(Predicate::LikeContains("project", value.clone()));
1822 }
1823 Some(ProjectFilter::Regex(pattern)) => {
1824 clauses.push(Predicate::Regex("project", pattern.clone()));
1825 }
1826 }
1827
1828 if let Some(session_id) = &filters.session_id {
1829 clauses.push(Predicate::Eq("session_id", session_id.clone().into()));
1830 }
1831 if let Some(source_agent) = &filters.source_agent {
1832 clauses.push(Predicate::Eq("source_agent", source_agent.clone().into()));
1833 }
1834 if let Some(role) = &filters.role {
1835 if !matches!(role.as_str(), "user" | "assistant" | "system" | "tool") {
1836 return Err(map_error(crate::Error::validation_field(
1837 format!(
1838 "filters.role must be one of: user, assistant, system, tool; got {role}"
1839 ),
1840 "filters.role",
1841 Some(serde_json::json!(role)),
1842 Some("one of: user, assistant, system, tool".to_owned()),
1843 )));
1844 }
1845 clauses.push(Predicate::Eq("role", role.clone().into()));
1846 }
1847 if let Some(from_date) = &filters.from_date {
1848 clauses.push(Predicate::Gte(
1849 "timestamp",
1850 ScalarValue::Raw(date_bound(from_date, "filters.from_date", false)?),
1851 ));
1852 }
1853 if let Some(to_date) = &filters.to_date {
1854 clauses.push(Predicate::Lte(
1855 "timestamp",
1856 ScalarValue::Raw(date_bound(to_date, "filters.to_date", true)?),
1857 ));
1858 }
1859
1860 Ok(Predicate::And(clauses))
1861 }
1862
1863 fn date_bound(date: &str, field: &str, end_of_day: bool) -> Result<String, ErrorEnvelope> {
1866 NaiveDate::parse_from_str(date, "%Y-%m-%d").map_err(|_| {
1867 map_error(crate::Error::validation_field(
1868 format!("{field} must be in YYYY-MM-DD format; got {date}"),
1869 field,
1870 Some(serde_json::json!(date)),
1871 Some("YYYY-MM-DD".to_owned()),
1872 ))
1873 })?;
1874 let time = if end_of_day { "23:59:59" } else { "00:00:00" };
1875 Ok(format!("timestamp '{date} {time}'"))
1876 }
1877
1878 fn empty_response() -> SearchResponse {
1879 SearchResponse {
1880 sessions: Vec::new(),
1881 matched_total: 0,
1882 has_more: false,
1883 next_cursor: None,
1884 }
1885 }
1886
1887 #[cfg(test)]
1888 mod fusion_helpers_tests {
1889 #![allow(clippy::expect_used, clippy::unwrap_used)]
1890
1891 use super::*;
1892
1893 #[test]
1894 fn session_root_strips_agent_suffix_for_claude_code_subagents() {
1895 assert_eq!(
1896 session_root("94a50f23-1234-5678-9abc-def012345678"),
1897 "94a50f23-1234-5678-9abc-def012345678",
1898 );
1899 assert_eq!(
1900 session_root("94a50f23-1234-5678-9abc-def012345678/agent-abc123"),
1901 "94a50f23-1234-5678-9abc-def012345678",
1902 );
1903 assert_eq!(session_root("root/a/b"), "root");
1905 }
1906
1907 #[test]
1908 fn fuse_arms_dedupes_intra_arm_by_session_root_and_credits_cross_arm() {
1909 let mk = |sid: &str, mid: &str| crate::sessions::MessageKey {
1910 session_id: sid.to_owned(),
1911 message_id: mid.to_owned(),
1912 };
1913 let fts = RankedList {
1919 retriever: RetrieverKind::Fts,
1920 entries: vec![
1921 (mk("session-A", "msg-1"), 10.0),
1922 (mk("session-A", "msg-2"), 9.0),
1923 (mk("session-B", "msg-3"), 6.0),
1924 (mk("session-A/agent-x", "msg-4"), 5.0),
1925 ],
1926 weight: 0.135,
1927 };
1928 let vec_arm = RankedList {
1929 retriever: RetrieverKind::Vector,
1930 entries: vec![
1931 (mk("session-B", "msg-7"), 0.9),
1932 (mk("session-A", "msg-9"), 0.6),
1933 ],
1934 weight: 1.0,
1935 };
1936 let merged = fuse_arms(&[fts, vec_arm]);
1937 assert_eq!(merged.len(), 2);
1939 assert_eq!(merged[0].key.session_id, "session-B");
1946 assert_eq!(merged[0].key.message_id, "msg-3");
1949 assert_eq!(merged[0].matched_via, vec!["fts", "vector"]);
1950 assert_eq!(merged[1].key.session_id, "session-A");
1951 assert_eq!(merged[1].key.message_id, "msg-1");
1952 assert_eq!(merged[1].matched_via, vec!["fts", "vector"]);
1953 }
1954
1955 #[test]
1956 fn fuse_arms_collapses_degenerate_tied_arm_to_zero_contribution() {
1957 let mk = |sid: &str, mid: &str| crate::sessions::MessageKey {
1963 session_id: sid.to_owned(),
1964 message_id: mid.to_owned(),
1965 };
1966 let fts = RankedList {
1967 retriever: RetrieverKind::Fts,
1968 entries: vec![(mk("session-A", "a"), 1.0), (mk("session-B", "b"), 1.0)],
1969 weight: 0.135,
1970 };
1971 let vec_arm = RankedList {
1972 retriever: RetrieverKind::Vector,
1973 entries: vec![(mk("session-A", "a"), 0.9), (mk("session-B", "b"), 0.3)],
1974 weight: 1.0,
1975 };
1976 let merged = fuse_arms(&[fts, vec_arm]);
1977 assert_eq!(merged[0].key.session_id, "session-A");
1979 assert!((merged[0].score - 1.0).abs() < 1e-9);
1980 assert!(merged[1].score.abs() < 1e-9);
1981 }
1982 }
1983}
1984
1985pub use search_handler::{
1986 FusedHit, RankedList, RetrieverKind, SearchMode, SearchPlan, build_filter, explain_search_plan,
1987 fuse_arms, hit_payload, plan_search, pond_search,
1988};
1989
1990#[cfg(test)]
1991mod tests {
1992 #![allow(clippy::expect_used, clippy::unwrap_used)]
1993
1994 use super::*;
1995 use crate::wire::{ProjectFilter, SearchFilters, SearchRequest};
1996 use chrono::Utc;
1997
1998 fn search_request(query: &str) -> SearchRequest {
1999 SearchRequest {
2000 protocol_version: crate::PROTOCOL_VERSION,
2001 namespace: Some("local".to_owned()),
2002 query: query.to_owned(),
2003 mode_override: None,
2004 similar_to: None,
2005 filters: SearchFilters::default(),
2006 limit: 20,
2007 cursor: None,
2008 }
2009 }
2010
2011 fn key(session: &str, id: &str) -> crate::sessions::MessageKey {
2012 crate::sessions::MessageKey {
2013 session_id: session.to_owned(),
2014 message_id: id.to_owned(),
2015 }
2016 }
2017
2018 #[test]
2019 fn fuse_arms_fuses_retrievers_and_reports_provenance() {
2020 let lists = [
2025 RankedList {
2026 retriever: RetrieverKind::Vector,
2027 entries: vec![
2028 (key("session-a", "a"), 0.9),
2029 (key("session-b", "b"), 0.7),
2030 (key("session-c", "c"), 0.5),
2031 ],
2032 weight: 1.0,
2033 },
2034 RankedList {
2035 retriever: RetrieverKind::Fts,
2036 entries: vec![
2037 (key("session-b", "b"), 10.0),
2038 (key("session-a", "a"), 8.0),
2039 (key("session-d", "d"), 4.0),
2040 ],
2041 weight: 0.135,
2042 },
2043 ];
2044 let merged = fuse_arms(&lists);
2045
2046 assert_eq!(merged[0].key.session_id, "session-a");
2053 assert_eq!(merged[1].key.session_id, "session-b");
2054 assert_eq!(merged[0].matched_via, vec!["vector", "fts"]);
2055 assert!(merged[0].score > merged[1].score);
2056
2057 let c = merged
2058 .iter()
2059 .find(|hit| hit.key.session_id == "session-c")
2060 .unwrap();
2061 assert_eq!(c.matched_via, vec!["vector"]);
2062 let d = merged
2063 .iter()
2064 .find(|hit| hit.key.session_id == "session-d")
2065 .unwrap();
2066 assert_eq!(d.matched_via, vec!["fts"]);
2067 }
2068
2069 #[test]
2070 fn hit_payload_returns_short_text_in_full() {
2071 let short = "a short message body";
2072 let text = hit_payload(short, "message");
2073 assert_eq!(text, short, "small text is returned as-is");
2074 }
2075
2076 #[test]
2077 fn hit_payload_windows_long_text_around_the_query_term() {
2078 let body = format!("{}NEEDLE{}", "a".repeat(2000), "b".repeat(394));
2080 let text = hit_payload(&body, "needle");
2081 assert!(
2082 text.contains("NEEDLE"),
2083 "text is the match-windowed snippet: {text}"
2084 );
2085 assert!(
2088 text.chars().count() <= 600 + 64,
2089 "snippet window is bounded by HIT_SNIPPET_CHARS plus markers: {}",
2090 text.chars().count()
2091 );
2092 }
2093
2094 #[test]
2095 fn hit_payload_snippet_survives_case_folding_that_changes_byte_length() {
2096 let body = format!("İÉÉÉ{}", "a".repeat(2100));
2100 let text = hit_payload(&body, "ééé");
2101 assert!(
2102 text.contains("ÉÉÉ"),
2103 "snippet windows on the matched term: {text}"
2104 );
2105 }
2106
2107 #[tokio::test]
2108 async fn restore_lineage_rejects_a_graph_nesting_deeper_than_one_level() {
2109 use crate::adapter::Extracted;
2110 use crate::sessions::Store;
2111 use crate::wire::{ProviderOptions, Session};
2112 use tempfile::TempDir;
2113
2114 let session = |id: &str, parent: Option<&str>| Session {
2115 id: id.to_owned(),
2116 parent_session_id: parent.map(str::to_owned),
2117 parent_message_id: None,
2118 source_agent: "claude-code".to_owned(),
2119 created_at: Utc::now(),
2120 project: Extracted::from_test_value("/tmp/pond".to_owned()),
2121 options: ProviderOptions::new(),
2122 };
2123
2124 let dir = TempDir::new().unwrap();
2125 let store = Store::open_local(dir.path()).await.unwrap();
2126 store
2128 .upsert_sessions(&[
2129 session("a", None),
2130 session("b", Some("a")),
2131 session("c", Some("b")),
2132 ])
2133 .await
2134 .unwrap();
2135
2136 let err = restore_lineage(&store, "a").await.unwrap_err();
2138 assert!(
2139 err.to_string().contains("one subagent level"),
2140 "expected the deeper-graph error, got: {err}"
2141 );
2142
2143 let lineage = restore_lineage(&store, "b").await.unwrap();
2145 let ids: Vec<&str> = lineage.iter().map(|s| s.session.id.as_str()).collect();
2146 assert_eq!(ids, ["b", "c"]);
2147 }
2148
2149 #[test]
2150 fn build_filter_pushes_down_each_predicate_and_handles_empty() {
2151 let filters = SearchFilters {
2152 project: Some(ProjectFilter::Contains("/Users/me/pond".to_owned())),
2153 session_id: Some("01HXY".to_owned()),
2154 source_agent: Some("claude-code".to_owned()),
2155 role: Some("assistant".to_owned()),
2156 from_date: Some("2026-01-01".to_owned()),
2157 to_date: Some("2026-05-01".to_owned()),
2158 min_score: 0.0,
2159 };
2160 let sql = build_filter(&filters).unwrap().to_lance();
2161 assert!(sql.contains("project LIKE '%/Users/me/pond%'"));
2162 assert!(sql.contains("session_id = '01HXY'"));
2163 assert!(sql.contains("source_agent = 'claude-code'"));
2164 assert!(sql.contains("role = 'assistant'"));
2165 assert!(sql.contains("timestamp >="));
2166 assert!(sql.contains("timestamp <="));
2167
2168 assert_eq!(
2170 build_filter(&SearchFilters::default()).unwrap().to_lance(),
2171 "",
2172 );
2173 }
2174
2175 #[test]
2176 fn build_filter_rejects_bad_role_and_date() {
2177 let bad_role = SearchFilters {
2178 role: Some("wizard".to_owned()),
2179 ..SearchFilters::default()
2180 };
2181 assert!(build_filter(&bad_role).is_err());
2182
2183 let bad_date = SearchFilters {
2184 from_date: Some("01-01-2026".to_owned()),
2185 ..SearchFilters::default()
2186 };
2187 assert!(build_filter(&bad_date).is_err());
2188 }
2189
2190 #[test]
2191 fn build_filter_contains_escapes_like_wildcards() {
2192 let filters = SearchFilters {
2193 project: Some(ProjectFilter::Contains("/Users/me/my_project".to_owned())),
2194 ..SearchFilters::default()
2195 };
2196 let sql = build_filter(&filters).unwrap().to_lance();
2197 assert!(
2200 sql.contains(r"my\_project"),
2201 "underscore must be escaped: {sql}"
2202 );
2203 assert!(
2204 sql.contains(r"ESCAPE '\'"),
2205 "predicate must declare the escape char: {sql}"
2206 );
2207 }
2208
2209 #[test]
2210 fn plan_search_shapes_request_for_each_planning_input() {
2211 let mut request = search_request(" vector memory ");
2212 request.limit = 500;
2213 request.filters.min_score = 0.42;
2214 let plan = plan_search(request, SearchMode::Hybrid).unwrap();
2215 assert_eq!(plan.mode, SearchMode::Hybrid);
2216 assert_eq!(plan.query, "vector memory");
2217 assert_eq!(plan.limit, 200);
2218 assert_eq!(plan.pool, 1000);
2219 assert_eq!(plan.vector_pool, 2000);
2220 assert_eq!(plan.min_score, 0.42);
2221
2222 let mut request = search_request("tiny pool");
2224 request.limit = 1;
2225 let plan = plan_search(request, SearchMode::Fts).unwrap();
2226 assert_eq!(plan.mode, SearchMode::Fts);
2227 assert_eq!(plan.limit, 1);
2228 assert_eq!(plan.pool, 50);
2229 assert_eq!(plan.vector_pool, 100);
2230
2231 let mut request = search_request("filtered");
2233 request.filters.project = Some(ProjectFilter::Contains("/Users/me/pond".to_owned()));
2234 request.filters.role = Some("assistant".to_owned());
2235 let plan = plan_search(request, SearchMode::Fts).unwrap();
2236 let sql = plan.filter.to_lance();
2237 assert!(sql.contains("project LIKE"));
2238 assert!(sql.contains("role = 'assistant'"));
2239 }
2240
2241 #[test]
2242 fn plan_search_rejects_invalid_composition_before_execution() {
2243 let mut blank = search_request(" ");
2244 let error = plan_search(blank.clone(), SearchMode::Fts)
2245 .unwrap_err()
2246 .error;
2247 assert_eq!(error.code, crate::wire::ErrorCode::ValidationFailed);
2248 assert_eq!(error.details["field"], "query");
2249
2250 blank.query = "valid".to_owned();
2251 blank.limit = 0;
2252 let error = plan_search(blank.clone(), SearchMode::Fts)
2253 .unwrap_err()
2254 .error;
2255 assert_eq!(error.details["field"], "limit");
2256
2257 blank.limit = 1;
2258 blank.namespace = Some("remote".to_owned());
2259 let error = plan_search(blank, SearchMode::Fts).unwrap_err().error;
2260 assert_eq!(error.code, crate::wire::ErrorCode::NamespaceUnknown);
2261 assert_eq!(error.details["namespace"], "remote");
2262 }
2263}