1fn map_error(error: crate::Error) -> crate::wire::ErrorEnvelope {
2 error.into()
3}
4
5#[derive(Debug, Clone)]
10pub struct NamespaceIdent(pub Vec<String>);
11
12impl NamespaceIdent {
13 pub fn root() -> Self {
14 Self(vec![])
15 }
16 pub fn as_table_id(&self, table_name: &str) -> Vec<String> {
17 let mut id = self.0.clone();
18 id.push(table_name.to_string());
19 id
20 }
21}
22
23pub fn resolve_namespace(
27 namespace: Option<&str>,
28) -> Result<NamespaceIdent, crate::wire::ErrorEnvelope> {
29 match namespace {
30 None | Some(crate::wire::DEFAULT_NAMESPACE) => Ok(NamespaceIdent::root()),
31 Some(other) => Err(map_error(crate::Error::namespace_unknown(other))),
32 }
33}
34
35fn map_storage(error: anyhow::Error) -> crate::wire::ErrorEnvelope {
36 if let Some(conflict) = error.downcast_ref::<crate::substrate::ConflictExhausted>() {
39 return map_error(crate::Error::Conflict {
40 attempts: conflict.attempts,
41 });
42 }
43 map_error(crate::Error::Storage(error))
44}
45
46mod ingest_handler {
47 use anyhow::Result;
48 use tokio_stream::StreamExt;
49
50 use crate::{
51 adapter::{Adapter, AdapterYield, SkipOracle, SkipReason},
52 sessions::{IngestEvent, IngestSummary, IngestValidator, OutcomeStatus, RowOutcome, Store},
53 wire::{
54 ErrorBody, ErrorCode, IngestEnvelope, IngestRequest, IngestResponse, IngestResult,
55 IngestStatus, validate_protocol,
56 },
57 };
58
59 use super::{map_error, map_storage};
60
61 pub const MAX_INGEST_EVENTS: usize = 1000;
63
64 #[derive(Debug, Clone)]
71 pub enum SyncEvent {
72 Discovered { total: Option<usize> },
76 SessionDone(SessionOutcome),
79 }
80
81 #[derive(Debug, Clone)]
83 pub struct SessionOutcome {
84 pub project: Option<String>,
86 pub session_id: Option<String>,
89 pub messages: usize,
92 pub status: SyncStatus,
93 }
94
95 #[derive(Debug, Clone)]
108 pub enum SyncStatus {
109 Ok,
110 Partial {
111 dropped_events: usize,
112 first_drop_reason: Option<String>,
115 },
116 Skipped {
117 reason: String,
118 },
119 Rejected {
120 reason: String,
121 },
122 Fresh,
125 Empty,
129 }
130
131 #[derive(Debug, Default)]
132 struct InFlight {
133 project: Option<String>,
134 session_id: String,
135 messages: usize,
136 dropped_events: usize,
141 first_drop_reason: Option<String>,
142 session_index: usize,
146 }
147
148 #[derive(Debug)]
153 struct PendingDone {
154 project: Option<String>,
155 session_id: String,
156 messages: usize,
157 dropped_events: usize,
158 first_drop_reason: Option<String>,
159 session_index: usize,
160 }
161
162 const ADAPTER_FLUSH_BATCH: usize = 100;
169
170 pub async fn ingest_adapter<F>(
181 store: &Store,
182 adapter: &dyn Adapter,
183 oracle: &dyn SkipOracle,
184 mut on_event: F,
185 ) -> Result<IngestSummary>
186 where
187 F: FnMut(SyncEvent),
188 {
189 let mut summary = IngestSummary::default();
190 let truncations_before = crate::adapter::extract::truncated_values_count();
191 let total = adapter
195 .discover()
196 .await
197 .map_err(|error| tracing::debug!(%error, "adapter discover failed"))
198 .ok();
199 on_event(SyncEvent::Discovered { total });
200
201 let mut events = adapter.events_with(oracle);
202 let mut validator = IngestValidator::default();
203 let mut index = 0usize;
207 let mut in_flight: Option<InFlight> = None;
208 let mut pending_dones: std::collections::VecDeque<PendingDone> =
212 std::collections::VecDeque::new();
213 let mut decode_total = std::time::Duration::ZERO;
218 let mut decode_count = 0u64;
219 let mut validator_total = std::time::Duration::ZERO;
220 let mut validator_count = 0u64;
221 let run_started = std::time::Instant::now();
222
223 loop {
224 let decode_start = std::time::Instant::now();
225 let next = events.next().await;
226 decode_total += decode_start.elapsed();
227 decode_count += 1;
228 let event = match next {
229 Some(event) => event,
230 None => break,
231 };
232 match event {
233 Ok(AdapterYield::Skipped {
234 session_id,
235 project,
236 reason,
237 }) => {
238 let status = match reason {
239 SkipReason::Fresh => {
240 summary.skipped_fresh += 1;
241 SyncStatus::Fresh
242 }
243 SkipReason::Empty => {
244 summary.skipped_empty += 1;
245 SyncStatus::Empty
246 }
247 SkipReason::Unsupported(reason) => {
248 summary.skipped_files += 1;
249 SyncStatus::Skipped { reason }
250 }
251 };
252 on_event(SyncEvent::SessionDone(SessionOutcome {
253 project,
254 session_id,
255 messages: 0,
256 status,
257 }));
258 }
259 Ok(AdapterYield::Event(event)) => {
260 if matches!(&event, IngestEvent::Session(_))
265 && let Some(prev) = in_flight.take()
266 {
267 pending_dones.push_back(PendingDone {
268 project: prev.project,
269 session_id: prev.session_id,
270 messages: prev.messages,
271 dropped_events: prev.dropped_events,
272 first_drop_reason: prev.first_drop_reason,
273 session_index: prev.session_index,
274 });
275 }
276 let event_index = index;
277 match &event {
278 IngestEvent::Session(session) => {
279 in_flight = Some(InFlight {
280 project: Some((*session.project).clone()),
281 session_id: session.id.clone(),
282 messages: 0,
283 dropped_events: 0,
284 first_drop_reason: None,
285 session_index: event_index,
286 });
287 }
288 IngestEvent::Message(_) => {
289 if let Some(slot) = in_flight.as_mut() {
290 slot.messages += 1;
291 }
292 }
293 IngestEvent::Part(_) => {}
294 }
295
296 let validator_start = std::time::Instant::now();
297 let push_outcomes = validator.push(store, index, event).await?;
298 validator_total += validator_start.elapsed();
299 validator_count += 1;
300 for outcome in &push_outcomes {
307 if matches!(outcome.status, OutcomeStatus::Error)
308 && outcome.kind != "session"
309 && let Some(slot) = in_flight.as_mut()
310 {
311 slot.dropped_events += 1;
312 if slot.first_drop_reason.is_none() {
313 slot.first_drop_reason =
314 outcome.error.as_ref().map(|err| err.message.clone());
315 }
316 }
317 }
318 summary.add_outcomes(&push_outcomes);
319 index += 1;
320
321 if validator.pending_substreams() >= ADAPTER_FLUSH_BATCH {
326 let flush_start = std::time::Instant::now();
327 let (flush_outcomes, flush_counts) = validator.flush(store).await?;
328 validator_total += flush_start.elapsed();
329 validator_count += 1;
330 summary.add_outcomes_errors_only(&flush_outcomes);
334 summary.add_batch(&flush_counts);
335 drain_pending_dones(&mut pending_dones, &flush_outcomes, &mut on_event);
336 }
337 }
338 Err(error) => {
339 tracing::debug!(
345 %error,
346 "adapter event error (per-line drop by design)"
347 );
348 match in_flight.as_mut() {
349 Some(slot) => {
350 slot.dropped_events += 1;
354 if slot.first_drop_reason.is_none() {
355 slot.first_drop_reason = Some(error.to_string());
356 }
357 summary.dropped_events += 1;
358 }
359 None => {
360 summary.skipped_files += 1;
365 on_event(SyncEvent::SessionDone(SessionOutcome {
366 project: None,
367 session_id: None,
368 messages: 0,
369 status: SyncStatus::Skipped {
370 reason: error.to_string(),
371 },
372 }));
373 }
374 }
375 }
376 }
377 }
378
379 if let Some(prev) = in_flight.take() {
380 pending_dones.push_back(PendingDone {
381 project: prev.project,
382 session_id: prev.session_id,
383 messages: prev.messages,
384 dropped_events: prev.dropped_events,
385 first_drop_reason: prev.first_drop_reason,
386 session_index: prev.session_index,
387 });
388 }
389 let validator_start = std::time::Instant::now();
390 let (final_outcomes, final_counts) = validator.finish(store).await?;
391 validator_total += validator_start.elapsed();
392 validator_count += 1;
393 summary.add_outcomes_errors_only(&final_outcomes);
394 summary.add_batch(&final_counts);
395 drain_pending_dones(&mut pending_dones, &final_outcomes, &mut on_event);
396
397 summary.truncated_values = crate::adapter::extract::truncated_values_count()
398 .saturating_sub(truncations_before) as usize;
399
400 let total = run_started.elapsed();
401 let other = total
402 .saturating_sub(decode_total)
403 .saturating_sub(validator_total);
404 tracing::info!(
405 target: "pond::perf",
406 total_ms = total.as_millis() as u64,
407 decode_ms = decode_total.as_millis() as u64,
408 validator_ms = validator_total.as_millis() as u64,
409 other_ms = other.as_millis() as u64,
410 decode_calls = decode_count,
411 validator_calls = validator_count,
412 rows_inserted = summary.inserted as u64,
413 rows_matched = summary.matched as u64,
414 dropped_events = summary.dropped_events as u64,
415 dropped_sessions = summary.dropped_sessions as u64,
416 skipped_files = summary.skipped_files as u64,
417 skipped_fresh = summary.skipped_fresh as u64,
418 truncated_values = summary.truncated_values as u64,
419 "ingest_adapter complete"
420 );
421 Ok(summary)
422 }
423
424 fn drain_pending_dones<F>(
431 queue: &mut std::collections::VecDeque<PendingDone>,
432 outcomes: &[RowOutcome],
433 on_event: &mut F,
434 ) where
435 F: FnMut(SyncEvent),
436 {
437 let mut session_outcome_by_index: std::collections::HashMap<usize, &RowOutcome> =
440 std::collections::HashMap::new();
441 for outcome in outcomes {
442 if outcome.kind == "session" {
443 session_outcome_by_index.insert(outcome.index, outcome);
444 }
445 }
446
447 while let Some(done) = queue.pop_front() {
448 let session_outcome = session_outcome_by_index.get(&done.session_index).copied();
449 let rejection_reason = session_outcome.and_then(|outcome| {
450 if matches!(outcome.status, OutcomeStatus::Error) {
451 Some(
452 outcome
453 .error
454 .as_ref()
455 .map(|err| err.message.clone())
456 .unwrap_or_else(|| "session-level rejection".to_owned()),
457 )
458 } else {
459 None
460 }
461 });
462 let status = if let Some(reason) = rejection_reason {
463 SyncStatus::Rejected { reason }
464 } else if done.dropped_events > 0 {
465 SyncStatus::Partial {
466 dropped_events: done.dropped_events,
467 first_drop_reason: done.first_drop_reason,
468 }
469 } else {
470 SyncStatus::Ok
471 };
472 on_event(SyncEvent::SessionDone(SessionOutcome {
473 project: done.project,
474 session_id: Some(done.session_id),
475 messages: done.messages,
476 status,
477 }));
478 }
479 }
480
481 pub async fn pond_ingest(store: &Store, request: IngestRequest) -> IngestEnvelope {
487 if let Err(envelope) = validate_protocol(request.protocol_version) {
488 return IngestEnvelope::Error(envelope);
489 }
490 if let Err(envelope) = super::resolve_namespace(request.namespace.as_deref()) {
491 return IngestEnvelope::Error(envelope);
492 }
493 if request.events.is_empty() {
494 return IngestEnvelope::Error(map_error(crate::Error::validation_field(
495 "events must be a non-empty array",
496 "events",
497 Some(serde_json::json!([])),
498 Some("non-empty array".to_owned()),
499 )));
500 }
501 if request.events.len() > MAX_INGEST_EVENTS {
502 return IngestEnvelope::Error(map_error(crate::Error::validation_field(
503 format!("ingest batch exceeds the event cap: at most {MAX_INGEST_EVENTS} events"),
504 "events",
505 Some(serde_json::json!(request.events.len())),
506 Some(format!("at most {MAX_INGEST_EVENTS} events")),
507 )));
508 }
509
510 match ingest_events(store, request.events).await {
511 Ok(outcomes) => {
512 let mut accepted = 0;
513 let mut rejected = 0;
514 for outcome in &outcomes {
515 match outcome.status {
516 OutcomeStatus::Inserted | OutcomeStatus::Matched => accepted += 1,
517 OutcomeStatus::Error => rejected += 1,
518 }
519 }
520 let results = outcomes
521 .into_iter()
522 .map(outcome_to_result)
523 .collect::<Vec<_>>();
524 IngestEnvelope::Success(IngestResponse {
525 accepted,
526 rejected,
527 results,
528 })
529 }
530 Err(failure) => IngestEnvelope::Error(map_storage(failure)),
531 }
532 }
533
534 pub async fn ingest_events(store: &Store, events: Vec<IngestEvent>) -> Result<Vec<RowOutcome>> {
540 let mut validator = IngestValidator::default();
541 let mut outcomes = Vec::with_capacity(events.len());
542 for (index, event) in events.into_iter().enumerate() {
543 let mut chunk = validator.push(store, index, event).await?;
544 outcomes.append(&mut chunk);
545 }
546 let (mut tail, _counts) = validator.finish(store).await?;
549 outcomes.append(&mut tail);
550 outcomes.sort_by_key(|outcome| outcome.index);
551 Ok(outcomes)
552 }
553
554 fn outcome_to_result(outcome: RowOutcome) -> IngestResult {
555 let (status, error) = match (outcome.status, outcome.error) {
556 (OutcomeStatus::Inserted, _) => (IngestStatus::Inserted, None),
557 (OutcomeStatus::Matched, _) => (IngestStatus::Matched, None),
558 (OutcomeStatus::Error, error) => {
559 let body = error
560 .map(|err| {
561 let mut details = serde_json::Map::new();
562 if let Some(field) = err.field {
563 details.insert("field".to_owned(), serde_json::json!(field));
564 }
565 if let Some(reason) = err.reason {
566 details.insert("reason".to_owned(), serde_json::json!(reason));
567 }
568 ErrorBody {
569 code: ErrorCode::ValidationFailed,
570 message: err.message,
571 details: serde_json::Value::Object(details),
572 }
573 })
574 .unwrap_or_else(|| ErrorBody {
575 code: ErrorCode::ValidationFailed,
576 message: "ingest failed".to_owned(),
577 details: serde_json::json!({}),
578 });
579 (IngestStatus::Error, Some(body))
580 }
581 };
582 IngestResult {
583 index: outcome.index,
584 kind: outcome.kind.to_owned(),
585 pk: outcome.pk,
586 status,
587 error,
588 }
589 }
590}
591
592pub use crate::sessions::{IngestEvent, IngestSummary, IngestValidator, search_text};
593pub use ingest_handler::{
594 MAX_INGEST_EVENTS, SessionOutcome, SyncEvent, SyncStatus, ingest_adapter, ingest_events,
595 pond_ingest,
596};
597
598mod export_handler {
599 use anyhow::{Context, Result};
610 use tokio::io::{AsyncWrite, AsyncWriteExt};
611
612 use crate::sessions::{IngestEvent, Store};
613
614 #[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
615 pub struct ExportSummary {
616 pub sessions: usize,
617 pub messages: usize,
618 pub parts: usize,
619 }
620
621 pub async fn pond_export<W>(
622 store: &Store,
623 session_filter: Option<&str>,
624 writer: &mut W,
625 ) -> Result<ExportSummary>
626 where
627 W: AsyncWrite + Unpin,
628 {
629 let mut session_ids = match session_filter {
630 Some(id) => vec![id.to_owned()],
631 None => store.session_ids().await?,
632 };
633 session_ids.sort();
634
635 let mut summary = ExportSummary::default();
636 for session_id in session_ids {
637 let Some(stored) = store
638 .get_session(&session_id)
639 .await
640 .with_context(|| format!("export: failed to load session {session_id}"))?
641 else {
642 if session_filter.is_some() {
643 anyhow::bail!("export: session not found: {session_id}");
644 }
645 continue;
646 };
647 write_event(writer, &IngestEvent::Session(stored.session)).await?;
648 summary.sessions += 1;
649 for message_with_parts in stored.messages {
650 write_event(writer, &IngestEvent::Message(message_with_parts.message)).await?;
651 summary.messages += 1;
652 for part in message_with_parts.parts {
653 write_event(writer, &IngestEvent::Part(part)).await?;
654 summary.parts += 1;
655 }
656 }
657 }
658 writer.flush().await.context("export: flush failed")?;
659 Ok(summary)
660 }
661
662 async fn write_event<W>(writer: &mut W, event: &IngestEvent) -> Result<()>
663 where
664 W: AsyncWrite + Unpin,
665 {
666 let line = serde_json::to_string(event).context("export: serialize event")?;
667 writer
668 .write_all(line.as_bytes())
669 .await
670 .context("export: write event")?;
671 writer
672 .write_all(b"\n")
673 .await
674 .context("export: write newline")?;
675 Ok(())
676 }
677}
678
679pub use export_handler::{ExportSummary, pond_export};
680
681mod restore_handler {
682 use anyhow::{Context, Result, bail};
689
690 use crate::sessions::{SessionWithMessages, Store};
691
692 pub async fn restore_lineage(
693 store: &Store,
694 session_id: &str,
695 ) -> Result<Vec<SessionWithMessages>> {
696 let Some(parent) = store.get_session(session_id).await? else {
697 bail!("export: session not found: {session_id}");
698 };
699 let mut sessions = vec![parent];
700 for child in store.child_sessions(session_id).await? {
701 if !store.child_sessions(&child.id).await?.is_empty() {
702 bail!(
703 "adapter-lineage-complete-restore supports one subagent level; session {} has child sessions",
704 child.id
705 );
706 }
707 let child_id = child.id;
708 let stored = store
709 .get_session(&child_id)
710 .await?
711 .with_context(|| format!("export: child session disappeared: {child_id}"))?;
712 sessions.push(stored);
713 }
714 Ok(sessions)
715 }
716}
717
718pub use restore_handler::restore_lineage;
719
720mod get_handler {
721 use crate::{
722 sessions::{GetLookup, MessageViewParams, RetrievedMessage, SessionViewParams, Store},
723 wire::{
724 GetEnvelope, GetRequest, GetResponse, GetResult, GetSession, MessageView, PartSummary,
725 ResponseMode, ResponsePart, validate_protocol,
726 },
727 };
728
729 use super::{map_error, map_storage};
730
731 fn to_message_view(message: RetrievedMessage, verbatim: bool) -> MessageView {
736 if verbatim {
737 return MessageView {
738 id: message.id,
739 role: message.role,
740 timestamp: message.timestamp,
741 text: None,
742 content: None,
743 parts_summary: Vec::new(),
744 parts: Some(
745 message
746 .parts
747 .into_iter()
748 .map(ResponsePart::from_part)
749 .collect(),
750 ),
751 };
752 }
753 let parts_summary = message
754 .parts
755 .iter()
756 .filter_map(|part| PartSummary::for_kind(&part.kind))
757 .collect();
758 MessageView {
759 id: message.id,
760 role: message.role,
761 timestamp: message.timestamp,
762 text: message.text,
763 content: message.content,
764 parts_summary,
765 parts: None,
766 }
767 }
768
769 const BUDGET_BYTES: usize = 200_000;
774
775 pub async fn pond_get(store: &Store, request: GetRequest) -> GetEnvelope {
776 if let Err(error) = validate_protocol(request.protocol_version) {
777 return GetEnvelope::Error(error);
778 }
779 if let Err(envelope) = super::resolve_namespace(request.namespace.as_deref()) {
780 return GetEnvelope::Error(envelope);
781 }
782
783 let response = match (&request.session_id, &request.message_id) {
784 (Some(session_id), None) => session_result(store, session_id, &request).await,
785 (None, Some(message_id)) => message_result(store, message_id, &request).await,
786 (Some(_), Some(_)) => Err(map_error(crate::Error::validation_field(
787 "session_id and message_id are mutually exclusive",
788 "message_id",
789 request.message_id.clone().map(serde_json::Value::String),
790 Some("omit when session_id is present".to_owned()),
791 ))),
792 (None, None) => Err(map_error(crate::Error::validation(
793 "one of session_id or message_id is required",
794 ))),
795 };
796
797 match response {
798 Ok(response) => GetEnvelope::Success(response),
799 Err(error) => GetEnvelope::Error(error),
800 }
801 }
802
803 fn unknown_after_id(request: &GetRequest, anchor_of: &str) -> crate::wire::ErrorEnvelope {
806 map_error(crate::Error::validation_field(
807 "after_id not found (stale or mistyped continuation anchor)",
808 "after_id",
809 request.after_id.clone().map(serde_json::Value::String),
810 Some(format!("a {anchor_of} from a prior page of this read")),
811 ))
812 }
813
814 async fn session_result(
815 store: &Store,
816 session_id: &str,
817 request: &GetRequest,
818 ) -> Result<GetResponse, crate::wire::ErrorEnvelope> {
819 let params = SessionViewParams {
820 mode: request.response_mode,
821 after_id: request.after_id.as_deref(),
822 limit: request.limit,
823 budget_bytes: BUDGET_BYTES,
824 };
825 let view = match store
826 .session_view(session_id, params)
827 .await
828 .map_err(map_storage)?
829 {
830 GetLookup::NotFound => {
831 return Err(map_error(crate::Error::not_found(
832 "session",
833 serde_json::json!(session_id),
834 format!("session not found: {session_id}"),
835 )));
836 }
837 GetLookup::UnknownAfterId => return Err(unknown_after_id(request, "message id")),
838 GetLookup::Found(view) => view,
839 };
840 let verbatim = matches!(request.response_mode, ResponseMode::Verbatim);
841 Ok(GetResponse {
842 session: GetSession::from_session(&view.session),
843 result: GetResult::Session {
844 messages: view
845 .messages
846 .into_iter()
847 .map(|message| to_message_view(message, verbatim))
848 .collect(),
849 messages_remaining: view.messages_remaining,
850 },
851 })
852 }
853
854 async fn message_result(
855 store: &Store,
856 message_id: &str,
857 request: &GetRequest,
858 ) -> Result<GetResponse, crate::wire::ErrorEnvelope> {
859 let params = MessageViewParams {
860 context_depth: request.context_depth,
861 after_id: request.after_id.as_deref(),
862 limit: request.limit,
863 budget_bytes: BUDGET_BYTES,
864 };
865 let view = match store
866 .message_view(message_id, params)
867 .await
868 .map_err(map_storage)?
869 {
870 GetLookup::NotFound => {
871 return Err(map_error(crate::Error::not_found(
872 "message",
873 serde_json::json!(message_id),
874 format!("message not found: {message_id}"),
875 )));
876 }
877 GetLookup::UnknownAfterId => return Err(unknown_after_id(request, "part id")),
878 GetLookup::Found(view) => view,
879 };
880 let target = MessageView {
883 id: view.target.id,
884 role: view.target.role,
885 timestamp: view.target.timestamp,
886 text: None,
887 content: None,
888 parts_summary: Vec::new(),
889 parts: None,
890 };
891 Ok(GetResponse {
892 session: GetSession::from_session(&view.session),
893 result: GetResult::Message {
894 target,
895 target_parts: view
896 .target_parts
897 .into_iter()
898 .map(ResponsePart::from_part)
899 .collect(),
900 target_parts_remaining: view.target_parts_remaining,
901 siblings: view
902 .siblings
903 .into_iter()
904 .map(|sibling| to_message_view(sibling, false))
905 .collect(),
906 },
907 })
908 }
909}
910
911pub use get_handler::pond_get;
912
913mod search_handler {
914 use base64::{Engine, engine::general_purpose::URL_SAFE_NO_PAD};
919
920 use crate::{
921 Clock, SystemClock,
922 embed::{Embedder, LazyEmbedder, format_query},
923 sessions::{MessageKey, MessageMeta, Store},
924 substrate::{Predicate, ScalarValue},
925 wire::{
926 ErrorEnvelope, PartSummary, ProjectFilter, Role, SearchCursor, SearchEnvelope,
927 SearchFilters, SearchRequest, SearchResponse, SearchResult, SearchSession,
928 validate_protocol,
929 },
930 };
931 use chrono::NaiveDate;
932 use std::collections::HashMap;
933
934 use super::{map_error, map_storage};
935
936 #[derive(Debug, Clone, Copy, PartialEq, Eq)]
941 pub enum SearchMode {
942 Hybrid,
943 Fts,
944 Vector,
945 }
946
947 #[derive(Debug, Clone, PartialEq)]
948 pub struct SearchPlan {
949 pub mode: SearchMode,
950 pub query: String,
951 pub similar_to: Option<String>,
955 pub filter: Predicate,
956 pub filters: SearchFilters,
957 pub pool: usize,
958 pub vector_pool: usize,
959 pub limit: usize,
960 pub offset: usize,
961 pub min_score: f64,
962 }
963
964 const LIMIT_CAP: usize = 200;
965 const MAX_MATCHES_PER_SESSION: usize = 3;
966 const SEARCH_BUDGET_BYTES: usize = 60_000;
967 const HIT_SNIPPET_CHARS: usize = 600;
971 const SCORE_DENOMINATOR: f64 = FTS_FUSION_WEIGHT + VECTOR_FUSION_WEIGHT;
972
973 const FTS_FUSION_WEIGHT: f64 = 0.135;
982 const VECTOR_FUSION_WEIGHT: f64 = 1.0;
983
984 fn encode_search_cursor(cursor: &SearchCursor) -> String {
985 #[allow(clippy::expect_used)]
986 let bytes = serde_json::to_vec(cursor).expect("search cursor encodes as JSON");
987 URL_SAFE_NO_PAD.encode(bytes)
988 }
989
990 fn decode_search_cursor(raw: &str) -> Result<SearchCursor, ErrorEnvelope> {
991 let bytes = URL_SAFE_NO_PAD.decode(raw).map_err(|_| {
992 map_error(crate::Error::validation_field(
993 "cursor is malformed (expected opaque value from a prior response)",
994 "cursor",
995 Some(serde_json::json!(raw)),
996 Some("opaque base64url".to_owned()),
997 ))
998 })?;
999 serde_json::from_slice(&bytes).map_err(|_| {
1000 map_error(crate::Error::validation_field(
1001 "cursor is malformed (decode failed)",
1002 "cursor",
1003 Some(serde_json::json!(raw)),
1004 Some("opaque cursor from a prior response".to_owned()),
1005 ))
1006 })
1007 }
1008
1009 pub async fn pond_search(
1018 store: &Store,
1019 embedder: &LazyEmbedder,
1020 request: SearchRequest,
1021 search: &crate::config::SearchConfig,
1022 ) -> SearchEnvelope {
1023 match run_search(store, embedder, request, search, &SystemClock).await {
1024 Ok(response) => SearchEnvelope::Success(response),
1025 Err(envelope) => SearchEnvelope::Error(envelope),
1026 }
1027 }
1028
1029 pub async fn explain_search_plan(
1030 store: &Store,
1031 embedder: &LazyEmbedder,
1032 request: SearchRequest,
1033 search: &crate::config::SearchConfig,
1034 ) -> Result<String, ErrorEnvelope> {
1035 let override_mode = request.mode_override.map(wire_mode_to_internal);
1036 let mut plan = plan_search(request, SearchMode::Fts)?;
1037 plan.mode = resolve_effective_mode(store, override_mode).await?;
1038 let mut out = String::new();
1039 if matches!(plan.mode, SearchMode::Fts | SearchMode::Hybrid) {
1040 let fts = store
1041 .explain_fts_plan(&plan.query, plan.pool, &plan.filter)
1042 .await
1043 .map_err(map_storage)?;
1044 out.push_str("fts:\n");
1045 out.push_str(&fts);
1046 out.push('\n');
1047 }
1048 if matches!(plan.mode, SearchMode::Vector | SearchMode::Hybrid) {
1049 let backend = load_embedder(embedder).await?;
1050 let vector = embed_query(backend.as_ref(), &plan.query)?;
1051 let vector_plan = store
1052 .explain_vector_plan(&vector, plan.vector_pool, &plan.filter, Some(search))
1053 .await
1054 .map_err(map_storage)?;
1055 out.push_str("vector:\n");
1056 out.push_str(&vector_plan);
1057 out.push('\n');
1058 }
1059 Ok(out)
1060 }
1061
1062 async fn run_search(
1063 store: &Store,
1064 embedder: &LazyEmbedder,
1065 request: SearchRequest,
1066 search: &crate::config::SearchConfig,
1067 _clock: &dyn Clock,
1068 ) -> Result<SearchResponse, ErrorEnvelope> {
1069 let override_mode = request.mode_override.map(wire_mode_to_internal);
1070 let mut plan = plan_search(request, SearchMode::Fts)?;
1071
1072 if plan.similar_to.is_some() {
1076 plan.mode = SearchMode::Vector;
1077 } else {
1078 plan.mode = resolve_effective_mode(store, override_mode).await?;
1082 }
1083
1084 let candidates = match plan.mode {
1085 SearchMode::Fts => {
1086 let hits = store
1087 .fts_search(&plan.query, plan.pool, &plan.filter)
1088 .await
1089 .map_err(map_storage)?;
1090 normalize_fts(hits)
1091 }
1092 SearchMode::Hybrid => {
1093 let backend = load_embedder(embedder).await?;
1094 let vector = embed_query(backend.as_ref(), &plan.query)?;
1095 let fts_fut = async {
1098 store
1099 .fts_search(&plan.query, plan.pool, &plan.filter)
1100 .await
1101 .map_err(map_storage)
1102 };
1103 let vector_fut = async {
1104 store
1105 .vector_search(&vector, plan.vector_pool, &plan.filter, Some(search))
1106 .await
1107 .map_err(map_storage)
1108 };
1109 let (fts, vector_raw) = tokio::try_join!(fts_fut, vector_fut)?;
1110 let fts_max = fts.iter().map(|(_, s)| *s).fold(0.0_f32, f32::max);
1122 let fts_entries: Vec<(MessageKey, f64)> = fts
1123 .into_iter()
1124 .map(|(key, score)| {
1125 let normed = if fts_max > 0.0 {
1126 f64::from(score / fts_max)
1127 } else {
1128 0.0
1129 };
1130 (key, normed)
1131 })
1132 .collect();
1133 let vec_n = vector_raw.len() as f64;
1134 let vector_entries: Vec<(MessageKey, f64)> = vector_raw
1135 .into_iter()
1136 .enumerate()
1137 .map(|(idx, (key, _))| {
1138 let normed = if vec_n > 0.0 {
1139 1.0 - (idx as f64 / vec_n)
1140 } else {
1141 0.0
1142 };
1143 (key, normed)
1144 })
1145 .collect();
1146 let lists = [
1151 RankedList {
1152 retriever: RetrieverKind::Fts,
1153 entries: fts_entries,
1154 weight: FTS_FUSION_WEIGHT,
1155 },
1156 RankedList {
1157 retriever: RetrieverKind::Vector,
1158 entries: vector_entries,
1159 weight: VECTOR_FUSION_WEIGHT,
1160 },
1161 ];
1162 fuse_arms(&lists)
1163 .into_iter()
1164 .map(|hit| Candidate {
1165 session_id: hit.key.session_id,
1166 message_id: hit.key.message_id,
1167 base_score: hit.score,
1168 })
1169 .collect()
1170 }
1171 SearchMode::Vector => {
1178 let vector = if let Some(similar_id) = &plan.similar_to {
1179 let stored = store
1180 .message_vector_by_id(similar_id)
1181 .await
1182 .map_err(map_storage)?;
1183 let Some(vector) = stored else {
1184 return Err(map_error(crate::Error::not_found(
1185 "message",
1186 serde_json::json!(similar_id),
1187 format!(
1188 "no embedded message with id {similar_id} (the message may not \
1189 exist, or it exists but is not yet embedded - run `pond embed`)"
1190 ),
1191 )));
1192 };
1193 vector
1194 } else {
1195 let backend = load_embedder(embedder).await?;
1196 embed_query(backend.as_ref(), &plan.query)?
1197 };
1198 let vector_raw = store
1199 .vector_search(&vector, plan.vector_pool, &plan.filter, Some(search))
1200 .await
1201 .map_err(map_storage)?;
1202 normalize_vector(vector_raw)
1203 }
1204 };
1205
1206 if candidates.is_empty() {
1207 return Ok(empty_response());
1208 }
1209
1210 let keys = candidates
1213 .iter()
1214 .map(|candidate| MessageKey {
1215 session_id: candidate.session_id.clone(),
1216 message_id: candidate.message_id.clone(),
1217 })
1218 .collect::<Vec<_>>();
1219 let metas = store
1220 .message_metas_by_keys(&keys)
1221 .await
1222 .map_err(map_storage)?;
1223 let meta_index = metas
1224 .iter()
1225 .map(|meta| ((meta.session_id.as_str(), meta.message_id.as_str()), meta))
1226 .collect::<std::collections::HashMap<_, _>>();
1227
1228 let mut scored = Vec::with_capacity(candidates.len());
1229 for candidate in candidates {
1230 let Some(meta) =
1231 meta_index.get(&(candidate.session_id.as_str(), candidate.message_id.as_str()))
1232 else {
1233 continue;
1234 };
1235 let score = candidate.base_score;
1236 if score < plan.min_score {
1237 continue;
1238 }
1239 scored.push(ScoredHit {
1240 meta: (*meta).clone(),
1241 score,
1242 });
1243 }
1244 scored.sort_by(|left, right| {
1245 right
1246 .score
1247 .partial_cmp(&left.score)
1248 .unwrap_or(std::cmp::Ordering::Equal)
1249 .then_with(|| left.meta.session_id.cmp(&right.meta.session_id))
1250 .then_with(|| left.meta.message_id.cmp(&right.meta.message_id))
1251 });
1252
1253 let matched_total = scored.len();
1254 let sessions = build_sessions(store, &scored, &plan.query).await?;
1255 page_sessions(sessions, matched_total, &plan)
1256 }
1257
1258 async fn resolve_effective_mode(
1263 store: &Store,
1264 override_mode: Option<SearchMode>,
1265 ) -> Result<SearchMode, ErrorEnvelope> {
1266 if let Some(mode) = override_mode {
1267 return Ok(mode);
1268 }
1269 let has = store.has_embeddings().await.map_err(map_storage)?;
1270 Ok(if has {
1271 SearchMode::Hybrid
1272 } else {
1273 SearchMode::Fts
1274 })
1275 }
1276
1277 async fn load_embedder(
1281 embedder: &LazyEmbedder,
1282 ) -> Result<std::sync::Arc<dyn Embedder>, ErrorEnvelope> {
1283 embedder.get().await.map_err(|error| {
1284 map_error(crate::Error::internal(format!(
1285 "embedder load failed: {error}"
1286 )))
1287 })
1288 }
1289
1290 pub fn plan_search(
1291 request: SearchRequest,
1292 mode: SearchMode,
1293 ) -> Result<SearchPlan, ErrorEnvelope> {
1294 validate_protocol(request.protocol_version)?;
1295
1296 let _ns = super::resolve_namespace(request.namespace.as_deref())?;
1297
1298 let cursor = match request.cursor.as_deref() {
1299 Some(raw) => Some(decode_search_cursor(raw)?),
1300 None => None,
1301 };
1302 let (query_raw, similar_raw, filters, offset) = match cursor {
1303 Some(cursor) => (
1304 cursor.query,
1305 cursor.similar_to,
1306 cursor.filters,
1307 cursor.offset,
1308 ),
1309 None => (request.query, request.similar_to, request.filters, 0),
1310 };
1311 let query = query_raw.trim().to_owned();
1312 let similar_to = similar_raw
1313 .as_ref()
1314 .map(|id| id.trim().to_owned())
1315 .filter(|id| !id.is_empty());
1316 if similar_to.is_none() && query.is_empty() {
1317 return Err(map_error(crate::Error::validation_field(
1318 "query must be non-empty after trim",
1319 "query",
1320 Some(serde_json::json!(query_raw)),
1321 Some("non-empty string after trim, or pass `similar_to`".to_owned()),
1322 )));
1323 }
1324 if request.limit == 0 {
1325 return Err(map_error(crate::Error::validation_field(
1326 "limit must be at least 1",
1327 "limit",
1328 Some(serde_json::json!(request.limit)),
1329 Some("integer >= 1".to_owned()),
1330 )));
1331 }
1332 let limit = request.limit.min(LIMIT_CAP);
1333 let min_score = filters.min_score;
1334 let filter = build_filter(&filters)?;
1335 let pool = limit.saturating_mul(5).max(50);
1338 Ok(SearchPlan {
1339 mode,
1340 query,
1341 similar_to,
1342 filter,
1343 filters,
1344 pool,
1345 vector_pool: pool.saturating_mul(2),
1346 limit,
1347 offset,
1348 min_score,
1349 })
1350 }
1351
1352 #[derive(Debug, Clone, Copy, PartialEq, Eq)]
1353 pub enum RetrieverKind {
1354 Vector,
1355 Fts,
1356 }
1357
1358 impl RetrieverKind {
1359 fn as_wire(self) -> &'static str {
1360 match self {
1361 Self::Vector => "vector",
1362 Self::Fts => "fts",
1363 }
1364 }
1365 }
1366
1367 pub struct RankedList {
1374 pub retriever: RetrieverKind,
1375 pub entries: Vec<(MessageKey, f64)>,
1376 pub weight: f64,
1377 }
1378
1379 fn wire_mode_to_internal(wire: crate::wire::SearchModeWire) -> SearchMode {
1382 match wire {
1383 crate::wire::SearchModeWire::Fts => SearchMode::Fts,
1384 crate::wire::SearchModeWire::Vector => SearchMode::Vector,
1385 crate::wire::SearchModeWire::Hybrid => SearchMode::Hybrid,
1386 }
1387 }
1388
1389 #[derive(Debug, Clone, PartialEq)]
1391 pub struct FusedHit {
1392 pub key: MessageKey,
1393 pub score: f64,
1394 pub matched_via: Vec<String>,
1395 }
1396
1397 fn session_root(session_id: &str) -> &str {
1402 match session_id.find('/') {
1403 Some(idx) => &session_id[..idx],
1404 None => session_id,
1405 }
1406 }
1407
1408 pub fn fuse_arms(lists: &[RankedList]) -> Vec<FusedHit> {
1431 let mut merged: std::collections::HashMap<String, (f64, Vec<String>, MessageKey)> =
1432 std::collections::HashMap::new();
1433 for list in lists {
1434 if list.entries.is_empty() {
1435 continue;
1436 }
1437 let mut lo = f64::INFINITY;
1446 let mut hi = f64::NEG_INFINITY;
1447 for (_, raw) in &list.entries {
1448 if *raw < lo {
1449 lo = *raw;
1450 }
1451 if *raw > hi {
1452 hi = *raw;
1453 }
1454 }
1455 let range = hi - lo;
1456 let mut seen_in_arm: std::collections::HashSet<String> =
1461 std::collections::HashSet::new();
1462 for (key, raw) in &list.entries {
1463 let root = session_root(&key.session_id).to_owned();
1464 if !seen_in_arm.insert(root.clone()) {
1465 continue;
1466 }
1467 let norm = if range > 0.0 { (raw - lo) / range } else { 0.0 };
1468 let contribution = list.weight * norm;
1469 let entry = merged
1470 .entry(root)
1471 .or_insert_with(|| (0.0, Vec::new(), key.clone()));
1472 entry.0 += contribution;
1473 entry.1.push(list.retriever.as_wire().to_owned());
1474 }
1475 }
1476 let mut hits = merged
1477 .into_values()
1478 .map(|(score, matched_via, key)| FusedHit {
1479 key,
1480 score,
1481 matched_via,
1482 })
1483 .collect::<Vec<_>>();
1484 hits.sort_by(|left, right| {
1485 right
1486 .score
1487 .partial_cmp(&left.score)
1488 .unwrap_or(std::cmp::Ordering::Equal)
1489 .then_with(|| left.key.cmp(&right.key))
1490 });
1491 hits
1492 }
1493
1494 const ANCHOR_MIN_TERM_CHARS: usize = 4;
1499
1500 pub fn hit_payload(text: &str, query: &str) -> String {
1505 let chars_len = text.chars().count();
1506 if chars_len <= HIT_SNIPPET_CHARS {
1507 return text.to_owned();
1508 }
1509 query_snippet(text, query)
1510 }
1511
1512 fn query_snippet(text: &str, query: &str) -> String {
1528 let lower_text = text.to_lowercase();
1529 let terms: Vec<String> = query
1530 .split_whitespace()
1531 .filter(|term| !term.is_empty())
1532 .map(str::to_lowercase)
1533 .collect();
1534 let any_informative = terms
1535 .iter()
1536 .any(|term| term.chars().count() >= ANCHOR_MIN_TERM_CHARS);
1537 let hit = terms
1538 .iter()
1539 .filter(|term| !any_informative || term.chars().count() >= ANCHOR_MIN_TERM_CHARS)
1540 .filter_map(|term| lower_text.find(term.as_str()))
1541 .min();
1542 let chars: Vec<char> = text.chars().collect();
1543 let center = hit
1547 .map(|byte| lower_text[..byte].chars().count())
1548 .unwrap_or(0);
1549 let half = HIT_SNIPPET_CHARS / 2;
1550 let start = center.saturating_sub(half);
1551 let end = (start + HIT_SNIPPET_CHARS).min(chars.len());
1552 let start = end.saturating_sub(HIT_SNIPPET_CHARS);
1553 let mut snippet = String::new();
1557 if start > 0 {
1558 snippet.push_str(&format!("[{start} chars before] "));
1559 }
1560 snippet.extend(&chars[start..end]);
1561 if end < chars.len() {
1562 snippet.push_str(&format!(
1563 " [+{} more chars; pond_get for full]",
1564 chars.len() - end
1565 ));
1566 }
1567 snippet
1568 }
1569
1570 struct Candidate {
1571 session_id: String,
1572 message_id: String,
1573 base_score: f64,
1574 }
1575
1576 struct ScoredHit {
1577 meta: MessageMeta,
1578 score: f64,
1579 }
1580
1581 impl ScoredHit {
1582 fn to_search_result(
1583 &self,
1584 query: &str,
1585 summaries: &HashMap<(String, String), Vec<PartSummary>>,
1586 ) -> Result<SearchResult, ErrorEnvelope> {
1587 let text = hit_payload(&self.meta.search_text, query);
1588 let role = match self.meta.role.as_str() {
1589 "system" => Role::System,
1590 "user" => Role::User,
1591 "assistant" => Role::Assistant,
1592 "tool" => Role::Tool,
1593 other => {
1594 return Err(map_error(crate::Error::internal(format!(
1595 "stored message has unknown role: {other}"
1596 ))));
1597 }
1598 };
1599 let parts_summary = if matches!(role, Role::User) {
1602 summaries
1603 .get(&(self.meta.session_id.clone(), self.meta.message_id.clone()))
1604 .cloned()
1605 .unwrap_or_default()
1606 } else {
1607 Vec::new()
1608 };
1609 Ok(SearchResult {
1610 message_id: self.meta.message_id.clone(),
1611 role,
1612 timestamp: self.meta.timestamp,
1613 text,
1614 score: normalize_score(self.score),
1615 parts_summary,
1616 })
1617 }
1618 }
1619
1620 fn normalize_score(score: f64) -> f64 {
1621 (score / SCORE_DENOMINATOR).clamp(0.0, 1.0)
1622 }
1623
1624 fn normalize_fts(hits: Vec<(MessageKey, f32)>) -> Vec<Candidate> {
1625 let max = hits.iter().map(|(_, score)| *score).fold(0.0_f32, f32::max);
1626 hits.into_iter()
1627 .map(|(key, score)| Candidate {
1628 session_id: key.session_id,
1629 message_id: key.message_id,
1630 base_score: if max > 0.0 {
1631 f64::from(score / max)
1632 } else {
1633 0.0
1634 },
1635 })
1636 .collect()
1637 }
1638
1639 fn normalize_vector(hits: Vec<(MessageKey, f32)>) -> Vec<Candidate> {
1640 let n = hits.len() as f64;
1641 hits.into_iter()
1642 .enumerate()
1643 .map(|(idx, (key, _))| Candidate {
1644 session_id: key.session_id,
1645 message_id: key.message_id,
1646 base_score: if n > 0.0 { 1.0 - (idx as f64 / n) } else { 0.0 },
1647 })
1648 .collect()
1649 }
1650
1651 fn embed_query(embedder: &dyn Embedder, query: &str) -> Result<Vec<f32>, ErrorEnvelope> {
1652 let prompt = format_query(query);
1653 let vectors =
1657 tokio::task::block_in_place(|| embedder.embed(&[prompt])).map_err(|error_value| {
1658 map_error(crate::Error::internal(format!(
1659 "failed to embed query: {error_value}"
1660 )))
1661 })?;
1662 vectors.into_iter().next().ok_or_else(|| {
1663 map_error(crate::Error::internal(
1664 "embedder returned no vector for query",
1665 ))
1666 })
1667 }
1668
1669 async fn build_sessions(
1670 store: &Store,
1671 scored: &[ScoredHit],
1672 query: &str,
1673 ) -> Result<Vec<SearchSession>, ErrorEnvelope> {
1674 use std::collections::BTreeMap;
1675
1676 struct Acc {
1677 project: String,
1678 source_agent: String,
1679 matched_count: usize,
1680 matches: Vec<SearchResult>,
1681 }
1682 let mut user_ids_by_session: BTreeMap<String, Vec<String>> = BTreeMap::new();
1686 for hit in scored {
1687 if hit.meta.role == "user" {
1688 user_ids_by_session
1689 .entry(hit.meta.session_id.clone())
1690 .or_default()
1691 .push(hit.meta.message_id.clone());
1692 }
1693 }
1694 let mut summaries: HashMap<(String, String), Vec<PartSummary>> = HashMap::new();
1695 for (session_id, message_ids) in &user_ids_by_session {
1696 for (key, parts) in store
1697 .summary_parts_for_messages(session_id, message_ids)
1698 .await
1699 .map_err(map_storage)?
1700 {
1701 summaries.insert(
1702 key,
1703 parts
1704 .iter()
1705 .filter_map(|part| PartSummary::for_kind(&part.kind))
1706 .collect(),
1707 );
1708 }
1709 }
1710
1711 let mut groups: BTreeMap<String, Acc> = BTreeMap::new();
1712 for hit in scored {
1713 let root = session_root(&hit.meta.session_id).to_owned();
1714 let entry = groups.entry(root).or_insert_with(|| Acc {
1715 project: hit.meta.project.clone(),
1716 source_agent: hit.meta.source_agent.clone(),
1717 matched_count: 0,
1718 matches: Vec::new(),
1719 });
1720 entry.matched_count += 1;
1721 entry.matches.push(hit.to_search_result(query, &summaries)?);
1722 }
1723
1724 let session_ids = groups.keys().cloned().collect::<Vec<_>>();
1725 let counts = store
1726 .session_message_counts(&session_ids)
1727 .await
1728 .map_err(map_storage)?;
1729
1730 let mut result = groups
1731 .into_iter()
1732 .map(|(session_id, mut acc)| {
1733 acc.matches.sort_by(|left, right| {
1734 right
1735 .score
1736 .partial_cmp(&left.score)
1737 .unwrap_or(std::cmp::Ordering::Equal)
1738 .then_with(|| left.message_id.cmp(&right.message_id))
1739 });
1740 acc.matches.truncate(MAX_MATCHES_PER_SESSION);
1741 SearchSession {
1742 session_messages_count: counts.get(&session_id).copied().unwrap_or_default(),
1743 session_id,
1744 project: acc.project,
1745 source_agent: acc.source_agent,
1746 matched_message_count: acc.matched_count,
1747 matches: acc.matches,
1748 }
1749 })
1750 .collect::<Vec<_>>();
1751 result.sort_by(|left, right| {
1752 let left_score = left
1753 .matches
1754 .first()
1755 .map(|hit| hit.score)
1756 .unwrap_or_default();
1757 let right_score = right
1758 .matches
1759 .first()
1760 .map(|hit| hit.score)
1761 .unwrap_or_default();
1762 right_score
1763 .partial_cmp(&left_score)
1764 .unwrap_or(std::cmp::Ordering::Equal)
1765 .then_with(|| left.session_id.cmp(&right.session_id))
1766 });
1767 Ok(result)
1768 }
1769
1770 fn page_sessions(
1771 sessions: Vec<SearchSession>,
1772 matched_total: usize,
1773 plan: &SearchPlan,
1774 ) -> Result<SearchResponse, ErrorEnvelope> {
1775 if plan.offset >= sessions.len() {
1776 return Ok(SearchResponse {
1777 sessions: Vec::new(),
1778 matched_total,
1779 has_more: false,
1780 next_cursor: None,
1781 });
1782 }
1783
1784 let mut emitted = Vec::new();
1785 let mut used_bytes = 0usize;
1786 for session in sessions.iter().skip(plan.offset) {
1787 if emitted.len() >= plan.limit {
1788 break;
1789 }
1790 let bytes = serde_json::to_string(session)
1791 .map_err(|error| {
1792 map_error(crate::Error::internal(format!(
1793 "failed to size search response session: {error}"
1794 )))
1795 })?
1796 .len();
1797 if !emitted.is_empty() && used_bytes.saturating_add(bytes) > SEARCH_BUDGET_BYTES {
1798 break;
1799 }
1800 used_bytes = used_bytes.saturating_add(bytes);
1801 emitted.push(session.clone());
1802 }
1803
1804 let next_offset = plan.offset + emitted.len();
1805 let has_more = next_offset < sessions.len();
1806 let next_cursor = has_more.then(|| {
1807 encode_search_cursor(&SearchCursor {
1808 query: plan.query.clone(),
1809 similar_to: plan.similar_to.clone(),
1810 filters: plan.filters.clone(),
1811 offset: next_offset,
1812 })
1813 });
1814
1815 Ok(SearchResponse {
1816 sessions: emitted,
1817 matched_total,
1818 has_more,
1819 next_cursor,
1820 })
1821 }
1822
1823 pub fn build_filter(filters: &SearchFilters) -> Result<Predicate, ErrorEnvelope> {
1827 let mut clauses = Vec::new();
1828
1829 match &filters.project {
1830 None => {}
1831 Some(ProjectFilter::Contains(value)) => {
1832 clauses.push(Predicate::LikeContains("project", value.clone()));
1833 }
1834 Some(ProjectFilter::Regex(pattern)) => {
1835 clauses.push(Predicate::Regex("project", pattern.clone()));
1836 }
1837 }
1838
1839 if let Some(session_id) = &filters.session_id {
1840 clauses.push(Predicate::Eq("session_id", session_id.clone().into()));
1841 }
1842 if let Some(source_agent) = &filters.source_agent {
1843 clauses.push(Predicate::Eq("source_agent", source_agent.clone().into()));
1844 }
1845 if let Some(role) = &filters.role {
1846 if !matches!(role.as_str(), "user" | "assistant" | "system" | "tool") {
1847 return Err(map_error(crate::Error::validation_field(
1848 format!(
1849 "filters.role must be one of: user, assistant, system, tool; got {role}"
1850 ),
1851 "filters.role",
1852 Some(serde_json::json!(role)),
1853 Some("one of: user, assistant, system, tool".to_owned()),
1854 )));
1855 }
1856 clauses.push(Predicate::Eq("role", role.clone().into()));
1857 }
1858 if let Some(from_date) = &filters.from_date {
1859 clauses.push(Predicate::Gte(
1860 "timestamp",
1861 ScalarValue::Raw(date_bound(from_date, "filters.from_date", false)?),
1862 ));
1863 }
1864 if let Some(to_date) = &filters.to_date {
1865 clauses.push(Predicate::Lte(
1866 "timestamp",
1867 ScalarValue::Raw(date_bound(to_date, "filters.to_date", true)?),
1868 ));
1869 }
1870
1871 Ok(Predicate::And(clauses))
1872 }
1873
1874 fn date_bound(date: &str, field: &str, end_of_day: bool) -> Result<String, ErrorEnvelope> {
1877 NaiveDate::parse_from_str(date, "%Y-%m-%d").map_err(|_| {
1878 map_error(crate::Error::validation_field(
1879 format!("{field} must be in YYYY-MM-DD format; got {date}"),
1880 field,
1881 Some(serde_json::json!(date)),
1882 Some("YYYY-MM-DD".to_owned()),
1883 ))
1884 })?;
1885 let time = if end_of_day { "23:59:59" } else { "00:00:00" };
1886 Ok(format!("timestamp '{date} {time}'"))
1887 }
1888
1889 fn empty_response() -> SearchResponse {
1890 SearchResponse {
1891 sessions: Vec::new(),
1892 matched_total: 0,
1893 has_more: false,
1894 next_cursor: None,
1895 }
1896 }
1897
1898 #[cfg(test)]
1899 mod fusion_helpers_tests {
1900 #![allow(clippy::expect_used, clippy::unwrap_used)]
1901
1902 use super::*;
1903
1904 #[test]
1905 fn session_root_strips_agent_suffix_for_claude_code_subagents() {
1906 assert_eq!(
1907 session_root("94a50f23-1234-5678-9abc-def012345678"),
1908 "94a50f23-1234-5678-9abc-def012345678",
1909 );
1910 assert_eq!(
1911 session_root("94a50f23-1234-5678-9abc-def012345678/agent-abc123"),
1912 "94a50f23-1234-5678-9abc-def012345678",
1913 );
1914 assert_eq!(session_root("root/a/b"), "root");
1916 }
1917
1918 #[test]
1919 fn fuse_arms_dedupes_intra_arm_by_session_root_and_credits_cross_arm() {
1920 let mk = |sid: &str, mid: &str| crate::sessions::MessageKey {
1921 session_id: sid.to_owned(),
1922 message_id: mid.to_owned(),
1923 };
1924 let fts = RankedList {
1930 retriever: RetrieverKind::Fts,
1931 entries: vec![
1932 (mk("session-A", "msg-1"), 10.0),
1933 (mk("session-A", "msg-2"), 9.0),
1934 (mk("session-B", "msg-3"), 6.0),
1935 (mk("session-A/agent-x", "msg-4"), 5.0),
1936 ],
1937 weight: 0.135,
1938 };
1939 let vec_arm = RankedList {
1940 retriever: RetrieverKind::Vector,
1941 entries: vec![
1942 (mk("session-B", "msg-7"), 0.9),
1943 (mk("session-A", "msg-9"), 0.6),
1944 ],
1945 weight: 1.0,
1946 };
1947 let merged = fuse_arms(&[fts, vec_arm]);
1948 assert_eq!(merged.len(), 2);
1950 assert_eq!(merged[0].key.session_id, "session-B");
1957 assert_eq!(merged[0].key.message_id, "msg-3");
1960 assert_eq!(merged[0].matched_via, vec!["fts", "vector"]);
1961 assert_eq!(merged[1].key.session_id, "session-A");
1962 assert_eq!(merged[1].key.message_id, "msg-1");
1963 assert_eq!(merged[1].matched_via, vec!["fts", "vector"]);
1964 }
1965
1966 #[test]
1967 fn fuse_arms_collapses_degenerate_tied_arm_to_zero_contribution() {
1968 let mk = |sid: &str, mid: &str| crate::sessions::MessageKey {
1974 session_id: sid.to_owned(),
1975 message_id: mid.to_owned(),
1976 };
1977 let fts = RankedList {
1978 retriever: RetrieverKind::Fts,
1979 entries: vec![(mk("session-A", "a"), 1.0), (mk("session-B", "b"), 1.0)],
1980 weight: 0.135,
1981 };
1982 let vec_arm = RankedList {
1983 retriever: RetrieverKind::Vector,
1984 entries: vec![(mk("session-A", "a"), 0.9), (mk("session-B", "b"), 0.3)],
1985 weight: 1.0,
1986 };
1987 let merged = fuse_arms(&[fts, vec_arm]);
1988 assert_eq!(merged[0].key.session_id, "session-A");
1990 assert!((merged[0].score - 1.0).abs() < 1e-9);
1991 assert!(merged[1].score.abs() < 1e-9);
1992 }
1993 }
1994}
1995
1996pub use search_handler::{
1997 FusedHit, RankedList, RetrieverKind, SearchMode, SearchPlan, build_filter, explain_search_plan,
1998 fuse_arms, hit_payload, plan_search, pond_search,
1999};
2000
2001#[cfg(test)]
2002mod tests {
2003 #![allow(clippy::expect_used, clippy::unwrap_used)]
2004
2005 use super::*;
2006 use crate::wire::{ProjectFilter, SearchFilters, SearchRequest};
2007 use chrono::Utc;
2008
2009 fn search_request(query: &str) -> SearchRequest {
2010 SearchRequest {
2011 protocol_version: crate::PROTOCOL_VERSION,
2012 namespace: Some("local".to_owned()),
2013 query: query.to_owned(),
2014 mode_override: None,
2015 similar_to: None,
2016 filters: SearchFilters::default(),
2017 limit: 20,
2018 cursor: None,
2019 }
2020 }
2021
2022 fn key(session: &str, id: &str) -> crate::sessions::MessageKey {
2023 crate::sessions::MessageKey {
2024 session_id: session.to_owned(),
2025 message_id: id.to_owned(),
2026 }
2027 }
2028
2029 #[test]
2030 fn fuse_arms_fuses_retrievers_and_reports_provenance() {
2031 let lists = [
2036 RankedList {
2037 retriever: RetrieverKind::Vector,
2038 entries: vec![
2039 (key("session-a", "a"), 0.9),
2040 (key("session-b", "b"), 0.7),
2041 (key("session-c", "c"), 0.5),
2042 ],
2043 weight: 1.0,
2044 },
2045 RankedList {
2046 retriever: RetrieverKind::Fts,
2047 entries: vec![
2048 (key("session-b", "b"), 10.0),
2049 (key("session-a", "a"), 8.0),
2050 (key("session-d", "d"), 4.0),
2051 ],
2052 weight: 0.135,
2053 },
2054 ];
2055 let merged = fuse_arms(&lists);
2056
2057 assert_eq!(merged[0].key.session_id, "session-a");
2064 assert_eq!(merged[1].key.session_id, "session-b");
2065 assert_eq!(merged[0].matched_via, vec!["vector", "fts"]);
2066 assert!(merged[0].score > merged[1].score);
2067
2068 let c = merged
2069 .iter()
2070 .find(|hit| hit.key.session_id == "session-c")
2071 .unwrap();
2072 assert_eq!(c.matched_via, vec!["vector"]);
2073 let d = merged
2074 .iter()
2075 .find(|hit| hit.key.session_id == "session-d")
2076 .unwrap();
2077 assert_eq!(d.matched_via, vec!["fts"]);
2078 }
2079
2080 #[test]
2081 fn hit_payload_returns_short_text_in_full() {
2082 let short = "a short message body";
2083 let text = hit_payload(short, "message");
2084 assert_eq!(text, short, "small text is returned as-is");
2085 }
2086
2087 #[test]
2088 fn hit_payload_windows_long_text_around_the_query_term() {
2089 let body = format!("{}NEEDLE{}", "a".repeat(2000), "b".repeat(394));
2091 let text = hit_payload(&body, "needle");
2092 assert!(
2093 text.contains("NEEDLE"),
2094 "text is the match-windowed snippet: {text}"
2095 );
2096 assert!(
2099 text.chars().count() <= 600 + 64,
2100 "snippet window is bounded by HIT_SNIPPET_CHARS plus markers: {}",
2101 text.chars().count()
2102 );
2103 }
2104
2105 #[test]
2106 fn hit_payload_snippet_survives_case_folding_that_changes_byte_length() {
2107 let body = format!("İÉÉÉ{}", "a".repeat(2100));
2111 let text = hit_payload(&body, "ééé");
2112 assert!(
2113 text.contains("ÉÉÉ"),
2114 "snippet windows on the matched term: {text}"
2115 );
2116 }
2117
2118 #[tokio::test]
2119 async fn restore_lineage_rejects_a_graph_nesting_deeper_than_one_level() {
2120 use crate::adapter::Extracted;
2121 use crate::sessions::Store;
2122 use crate::wire::{ProviderOptions, Session};
2123 use tempfile::TempDir;
2124
2125 let session = |id: &str, parent: Option<&str>| Session {
2126 id: id.to_owned(),
2127 parent_session_id: parent.map(str::to_owned),
2128 parent_message_id: None,
2129 source_agent: "claude-code".to_owned(),
2130 created_at: Utc::now(),
2131 project: Extracted::from_test_value("/tmp/pond".to_owned()),
2132 options: ProviderOptions::new(),
2133 };
2134
2135 let dir = TempDir::new().unwrap();
2136 let store = Store::open_local(dir.path()).await.unwrap();
2137 store
2139 .upsert_sessions(&[
2140 session("a", None),
2141 session("b", Some("a")),
2142 session("c", Some("b")),
2143 ])
2144 .await
2145 .unwrap();
2146
2147 let err = restore_lineage(&store, "a").await.unwrap_err();
2149 assert!(
2150 err.to_string().contains("one subagent level"),
2151 "expected the deeper-graph error, got: {err}"
2152 );
2153
2154 let lineage = restore_lineage(&store, "b").await.unwrap();
2156 let ids: Vec<&str> = lineage.iter().map(|s| s.session.id.as_str()).collect();
2157 assert_eq!(ids, ["b", "c"]);
2158 }
2159
2160 #[test]
2161 fn build_filter_pushes_down_each_predicate_and_handles_empty() {
2162 let filters = SearchFilters {
2163 project: Some(ProjectFilter::Contains("/Users/me/pond".to_owned())),
2164 session_id: Some("01HXY".to_owned()),
2165 source_agent: Some("claude-code".to_owned()),
2166 role: Some("assistant".to_owned()),
2167 from_date: Some("2026-01-01".to_owned()),
2168 to_date: Some("2026-05-01".to_owned()),
2169 min_score: 0.0,
2170 };
2171 let sql = build_filter(&filters).unwrap().to_lance();
2172 assert!(sql.contains("project LIKE '%/Users/me/pond%'"));
2173 assert!(sql.contains("session_id = '01HXY'"));
2174 assert!(sql.contains("source_agent = 'claude-code'"));
2175 assert!(sql.contains("role = 'assistant'"));
2176 assert!(sql.contains("timestamp >="));
2177 assert!(sql.contains("timestamp <="));
2178
2179 assert_eq!(
2181 build_filter(&SearchFilters::default()).unwrap().to_lance(),
2182 "",
2183 );
2184 }
2185
2186 #[test]
2187 fn build_filter_rejects_bad_role_and_date() {
2188 let bad_role = SearchFilters {
2189 role: Some("wizard".to_owned()),
2190 ..SearchFilters::default()
2191 };
2192 assert!(build_filter(&bad_role).is_err());
2193
2194 let bad_date = SearchFilters {
2195 from_date: Some("01-01-2026".to_owned()),
2196 ..SearchFilters::default()
2197 };
2198 assert!(build_filter(&bad_date).is_err());
2199 }
2200
2201 #[test]
2202 fn build_filter_contains_escapes_like_wildcards() {
2203 let filters = SearchFilters {
2204 project: Some(ProjectFilter::Contains("/Users/me/my_project".to_owned())),
2205 ..SearchFilters::default()
2206 };
2207 let sql = build_filter(&filters).unwrap().to_lance();
2208 assert!(
2211 sql.contains(r"my\_project"),
2212 "underscore must be escaped: {sql}"
2213 );
2214 assert!(
2215 sql.contains(r"ESCAPE '\'"),
2216 "predicate must declare the escape char: {sql}"
2217 );
2218 }
2219
2220 #[test]
2221 fn plan_search_shapes_request_for_each_planning_input() {
2222 let mut request = search_request(" vector memory ");
2223 request.limit = 500;
2224 request.filters.min_score = 0.42;
2225 let plan = plan_search(request, SearchMode::Hybrid).unwrap();
2226 assert_eq!(plan.mode, SearchMode::Hybrid);
2227 assert_eq!(plan.query, "vector memory");
2228 assert_eq!(plan.limit, 200);
2229 assert_eq!(plan.pool, 1000);
2230 assert_eq!(plan.vector_pool, 2000);
2231 assert_eq!(plan.min_score, 0.42);
2232
2233 let mut request = search_request("tiny pool");
2235 request.limit = 1;
2236 let plan = plan_search(request, SearchMode::Fts).unwrap();
2237 assert_eq!(plan.mode, SearchMode::Fts);
2238 assert_eq!(plan.limit, 1);
2239 assert_eq!(plan.pool, 50);
2240 assert_eq!(plan.vector_pool, 100);
2241
2242 let mut request = search_request("filtered");
2244 request.filters.project = Some(ProjectFilter::Contains("/Users/me/pond".to_owned()));
2245 request.filters.role = Some("assistant".to_owned());
2246 let plan = plan_search(request, SearchMode::Fts).unwrap();
2247 let sql = plan.filter.to_lance();
2248 assert!(sql.contains("project LIKE"));
2249 assert!(sql.contains("role = 'assistant'"));
2250 }
2251
2252 #[test]
2253 fn plan_search_rejects_invalid_composition_before_execution() {
2254 let mut blank = search_request(" ");
2255 let error = plan_search(blank.clone(), SearchMode::Fts)
2256 .unwrap_err()
2257 .error;
2258 assert_eq!(error.code, crate::wire::ErrorCode::ValidationFailed);
2259 assert_eq!(error.details["field"], "query");
2260
2261 blank.query = "valid".to_owned();
2262 blank.limit = 0;
2263 let error = plan_search(blank.clone(), SearchMode::Fts)
2264 .unwrap_err()
2265 .error;
2266 assert_eq!(error.details["field"], "limit");
2267
2268 blank.limit = 1;
2269 blank.namespace = Some("remote".to_owned());
2270 let error = plan_search(blank, SearchMode::Fts).unwrap_err().error;
2271 assert_eq!(error.code, crate::wire::ErrorCode::NamespaceUnknown);
2272 assert_eq!(error.details["namespace"], "remote");
2273 }
2274}