1use std::convert::Infallible;
43use std::net::SocketAddr;
44use std::str::FromStr;
45use std::sync::Arc;
46use std::time::Duration;
47
48use axum::extract::{FromRequestParts, Path, Query, State};
49use axum::http::request::Parts;
50use axum::http::{HeaderValue, Method, StatusCode};
51use axum::response::sse::{Event, KeepAlive, Sse};
52use axum::response::{IntoResponse, Response};
53use axum::routing::{get, post};
54use axum::{Json, Router};
55use futures::Stream;
56use serde::{Deserialize, Serialize};
57use solo_core::{
58 Confidence, DocumentId, EncodingContext, Episode, InvalidateEvent, MemoryId, TenantId,
59 Tier,
60};
61use solo_storage::{TenantHandle, TenantRegistry};
62use tokio::sync::broadcast;
63use tower_http::cors::{AllowOrigin, CorsLayer};
64use tower_http::trace::TraceLayer;
65
66use crate::auth::{AuthConfig, AuthenticatedPrincipal, middleware::AuthValidator};
67
68#[derive(Clone)]
72pub struct SoloHttpState {
73 pub registry: Arc<TenantRegistry>,
75 pub default_tenant: TenantId,
78 pub user_aliases: Arc<Vec<String>>,
85}
86
87pub const TENANT_HEADER: &str = "x-solo-tenant";
90
91pub struct TenantExtractor(pub Arc<TenantHandle>);
107
108impl<S> FromRequestParts<S> for TenantExtractor
109where
110 SoloHttpState: FromRef<S>,
111 S: Send + Sync,
112{
113 type Rejection = ApiError;
114
115 async fn from_request_parts(parts: &mut Parts, state: &S) -> Result<Self, Self::Rejection> {
116 let state = SoloHttpState::from_ref(state);
117 let resolved = if let Some(principal) = parts.extensions.get::<AuthenticatedPrincipal>()
124 && let Some(claim) = principal.tenant_claim.clone()
125 {
126 claim
127 } else {
128 match parts.headers.get(TENANT_HEADER) {
129 None => state.default_tenant.clone(),
130 Some(raw) => {
131 let s = raw.to_str().map_err(|e| {
132 ApiError::bad_request(format!(
133 "{TENANT_HEADER}: header value must be ASCII ({e})"
134 ))
135 })?;
136 TenantId::new(s.to_string()).map_err(|e| {
137 ApiError::bad_request(format!("{TENANT_HEADER}: invalid tenant id: {e}"))
138 })?
139 }
140 }
141 };
142 let handle = state.registry.get_or_open(&resolved).await.map_err(|e| {
143 use solo_core::Error;
145 match &e {
146 Error::NotFound(_) => ApiError::not_found(e.to_string()),
147 Error::InvalidInput(_) => ApiError::bad_request(e.to_string()),
148 _ => ApiError::internal(e.to_string()),
149 }
150 })?;
151 Ok(TenantExtractor(handle))
152 }
153}
154
155use axum::extract::FromRef;
156
157pub struct AuditPrincipal(pub Option<String>);
162
163impl<S> FromRequestParts<S> for AuditPrincipal
164where
165 S: Send + Sync,
166{
167 type Rejection = std::convert::Infallible;
168
169 async fn from_request_parts(
170 parts: &mut Parts,
171 _state: &S,
172 ) -> Result<Self, Self::Rejection> {
173 Ok(AuditPrincipal(
174 parts
175 .extensions
176 .get::<AuthenticatedPrincipal>()
177 .map(|p| p.subject.clone()),
178 ))
179 }
180}
181
182pub struct MaybePrincipal(pub Option<AuthenticatedPrincipal>);
195
196impl<S> FromRequestParts<S> for MaybePrincipal
197where
198 S: Send + Sync,
199{
200 type Rejection = std::convert::Infallible;
201
202 async fn from_request_parts(
203 parts: &mut Parts,
204 _state: &S,
205 ) -> Result<Self, Self::Rejection> {
206 Ok(MaybePrincipal(
207 parts
208 .extensions
209 .get::<AuthenticatedPrincipal>()
210 .cloned(),
211 ))
212 }
213}
214
215pub fn router_with_auth(state: SoloHttpState, bearer_token: Option<String>) -> Router {
224 let auth = bearer_token.map(|token| AuthConfig::Bearer { token });
225 router_with_auth_config(state, auth)
226}
227
228pub fn router_with_auth_config(state: SoloHttpState, auth: Option<AuthConfig>) -> Router {
239 let cors = build_cors_layer();
240 let public = Router::new()
248 .route("/health", get(|| async { "ok" }))
249 .route("/openapi.json", get(openapi_handler));
250
251 let authed = Router::new()
252 .route("/memory", post(remember_handler))
253 .route("/memory/search", post(recall_handler))
254 .route("/memory/consolidate", post(consolidate_handler))
255 .route("/memory/{id}", get(inspect_handler).delete(forget_handler))
256 .route("/backup", post(backup_handler))
257 .route("/memory/themes", get(themes_handler))
261 .route("/memory/facts_about", get(facts_about_handler))
262 .route("/memory/contradictions", get(contradictions_handler))
263 .route(
268 "/memory/clusters/{cluster_id}",
269 get(inspect_cluster_handler),
270 )
271 .route(
278 "/memory/documents/search",
279 post(search_docs_handler),
280 )
281 .route(
282 "/memory/documents",
283 post(ingest_document_handler).get(list_documents_handler),
284 )
285 .route(
286 "/memory/documents/{id}",
287 get(inspect_document_handler).delete(forget_document_handler),
288 )
289 .route("/v1/graph/expand", get(graph_expand_handler))
294 .route("/v1/graph/nodes", get(graph_nodes_handler))
298 .route("/v1/graph/edges", get(graph_edges_handler))
299 .route("/v1/graph/inspect/{id}", get(graph_inspect_handler))
302 .route("/v1/graph/neighbors/{id}", get(graph_neighbors_handler))
306 .route("/v1/graph/stream", get(graph_stream_handler))
313 .route("/v1/tenants", get(tenants_list_handler))
322 .with_state(state.clone());
323
324 let authed = if let Some(cfg) = auth {
325 let validator = Arc::new(AuthValidator::from_config(
329 &cfg,
330 state.default_tenant.clone(),
331 ));
332 authed.layer(axum::middleware::from_fn_with_state(
333 validator,
334 crate::auth::middleware::auth_middleware,
335 ))
336 } else {
337 authed
338 };
339
340 public
341 .merge(authed)
342 .layer(cors)
343 .layer(TraceLayer::new_for_http())
344}
345
346pub fn router(state: SoloHttpState) -> Router {
348 router_with_auth_config(state, None)
349}
350
351fn build_cors_layer() -> CorsLayer {
352 CorsLayer::new()
366 .allow_origin(AllowOrigin::predicate(|origin: &HeaderValue, _req| {
367 origin
368 .to_str()
369 .map(is_localhost_origin)
370 .unwrap_or(false)
371 }))
372 .allow_methods([Method::GET, Method::POST, Method::DELETE, Method::OPTIONS])
373 .allow_headers([
374 axum::http::header::CONTENT_TYPE,
375 axum::http::header::AUTHORIZATION,
376 ])
377}
378
379fn is_localhost_origin(origin: &str) -> bool {
383 let rest = origin
384 .strip_prefix("http://")
385 .or_else(|| origin.strip_prefix("https://"));
386 let host = match rest {
387 Some(r) => r,
388 None => return false,
389 };
390 let host = host.split('/').next().unwrap_or(host);
392 let host = if let Some(idx) = host.rfind(':') {
394 if host.starts_with('[') {
396 host.find(']')
398 .map(|i| &host[..=i])
399 .unwrap_or(host)
400 } else {
401 &host[..idx]
402 }
403 } else {
404 host
405 };
406 matches!(host, "localhost" | "127.0.0.1" | "[::1]")
407}
408
409pub async fn serve_http(
415 addr: SocketAddr,
416 state: SoloHttpState,
417 bearer_token: Option<String>,
418 shutdown: impl std::future::Future<Output = ()> + Send + 'static,
419) -> std::io::Result<()> {
420 let auth = bearer_token.map(|token| AuthConfig::Bearer { token });
421 serve_http_with_auth_config(addr, state, auth, shutdown).await
422}
423
424pub async fn serve_http_with_auth_config(
428 addr: SocketAddr,
429 state: SoloHttpState,
430 auth: Option<AuthConfig>,
431 shutdown: impl std::future::Future<Output = ()> + Send + 'static,
432) -> std::io::Result<()> {
433 let auth_kind = match &auth {
434 Some(AuthConfig::Bearer { .. }) => "bearer",
435 Some(AuthConfig::Oidc { .. }) => "oidc",
436 None => "none",
437 };
438 let app = router_with_auth_config(state, auth);
439 let listener = tokio::net::TcpListener::bind(addr).await?;
440 tracing::info!(%addr, auth = auth_kind, "solo http: listening");
441 axum::serve(listener, app)
442 .with_graceful_shutdown(shutdown)
443 .await
444}
445
446async fn openapi_handler() -> Json<serde_json::Value> {
460 Json(openapi_spec())
461}
462
463pub fn openapi_spec() -> serde_json::Value {
467 serde_json::json!({
468 "openapi": "3.1.0",
469 "info": {
470 "title": "Solo HTTP API",
471 "description":
472 "Local-first personal memory daemon. The HTTP transport \
473 mirrors the four MCP tools (memory_remember / recall / \
474 inspect / forget). Default deployment is loopback-only \
475 (127.0.0.1); LAN-bound deployments require a bearer \
476 token via `solo http-serve --bind <ip> --bearer-token-file <path>`.",
477 "version": env!("CARGO_PKG_VERSION"),
478 "license": { "name": "Apache-2.0" }
479 },
480 "servers": [
481 { "url": "http://127.0.0.1:7437", "description": "Default loopback (replace port with your --http-port)" }
482 ],
483 "components": {
484 "securitySchemes": {
485 "bearerAuth": {
486 "type": "http",
487 "scheme": "bearer",
488 "description":
489 "Bearer-token auth. Required only on LAN-bound deployments \
490 (`solo http-serve --bind <non-loopback> --bearer-token-file <path>`); \
491 the default `127.0.0.1` deployment is unauthenticated. \
492 `GET /health` and `GET /openapi.json` are exempt from auth even \
493 on bearer-protected instances."
494 }
495 },
496 "schemas": {
497 "RememberRequest": {
498 "type": "object",
499 "required": ["content"],
500 "properties": {
501 "content": { "type": "string", "minLength": 1, "description": "Episode content to embed + store." },
502 "source_type": { "type": "string", "description": "Free-form source tag (e.g. `user_message`, `tool_output`). Defaults to `user_message`." },
503 "source_id": { "type": "string", "description": "Optional upstream ID for traceability." }
504 },
505 "additionalProperties": false
506 },
507 "RememberResponse": {
508 "type": "object",
509 "required": ["memory_id"],
510 "properties": {
511 "memory_id": { "type": "string", "format": "uuid", "description": "UUID v7 assigned to the new episode." }
512 }
513 },
514 "RecallRequest": {
515 "type": "object",
516 "required": ["query"],
517 "properties": {
518 "query": { "type": "string", "minLength": 1, "description": "Natural-language query; embedded by the same model as stored episodes." },
519 "limit": { "type": "integer", "minimum": 1, "maximum": 50, "default": 5, "description": "Max number of hits to return." }
520 },
521 "additionalProperties": false
522 },
523 "RecallResult": {
524 "type": "object",
525 "description":
526 "Recall response. Fields are stable across v0.1 but not exhaustively documented here — \
527 see `solo_query::RecallResult` in the source for the canonical shape. \
528 Treat as a forward-compatible JSON object.",
529 "additionalProperties": true
530 },
531 "ConsolidationScope": {
532 "type": "object",
533 "description": "Filter + flags for consolidation. All fields optional; empty body = unbounded defaults.",
534 "properties": {
535 "window_days": { "type": "integer", "nullable": true, "description": "Restrict to memories with ts_ms >= now - window_days * 86400000. Null/omitted = unbounded." },
536 "force_merge": { "type": "boolean", "default": false, "description": "Run the existing-vs-existing merge + abstraction-regen passes even with zero unclustered candidates. Drift catch-up on quiet corpora. Added in 0.3.1." }
537 },
538 "additionalProperties": false
539 },
540 "ConsolidationReport": {
541 "type": "object",
542 "required": [
543 "episodes_seen", "clusters_built", "clusters_merged",
544 "clusters_absorbed", "existing_clusters_merged",
545 "episodes_clustered", "abstractions_built",
546 "abstractions_regenerated", "triples_built",
547 "contradictions_found"
548 ],
549 "properties": {
550 "episodes_seen": { "type": "integer", "minimum": 0 },
551 "clusters_built": { "type": "integer", "minimum": 0, "description": "Brand-new clusters that survived to be persisted (post in-run-merge, post cross-run-absorb)." },
552 "clusters_merged": { "type": "integer", "minimum": 0, "description": "In-run merge: clusters absorbed into a sibling within this consolidate run (cross-UTC-bucket case). Counts losers." },
553 "clusters_absorbed": { "type": "integer", "minimum": 0, "description": "Cross-run absorb: freshly-built clusters folded into a pre-existing DB cluster with a similar centroid. Counts new-side clusters." },
554 "existing_clusters_merged": { "type": "integer", "minimum": 0, "description": "Existing-vs-existing merge: pre-existing DB clusters that drifted toward each other and now coalesce. Counts losers." },
555 "episodes_clustered": { "type": "integer", "minimum": 0 },
556 "abstractions_built": { "type": "integer", "minimum": 0, "description": "Fresh abstractions persisted for newly-built clusters. 0 when no LlmClient is wired." },
557 "abstractions_regenerated": { "type": "integer", "minimum": 0, "description": "Existing clusters whose stale abstractions were dropped and rebuilt because absorb or existing-merge changed their episode set. 0 without an LlmClient." },
558 "triples_built": { "type": "integer", "minimum": 0 },
559 "contradictions_found": { "type": "integer", "minimum": 0 }
560 }
561 },
562 "EpisodeRecord": {
563 "type": "object",
564 "description":
565 "Inspect response: full episode record. Fields are stable across v0.1 but not \
566 exhaustively documented here — see `solo_query::EpisodeRecord` in the source. \
567 Treat as a forward-compatible JSON object.",
568 "additionalProperties": true
569 },
570 "ThemeHit": {
571 "type": "object",
572 "description":
573 "One cluster + its (optional) abstraction. Returned by GET /memory/themes. \
574 See `solo_query::ThemeHit` for the canonical shape: cluster_id, \
575 abstraction_id?, abstraction_text?, episode_count, coherence, created_at_ms.",
576 "additionalProperties": true
577 },
578 "FactHit": {
579 "type": "object",
580 "description":
581 "One Steward-extracted SPO triple. Returned by GET /memory/facts_about. \
582 See `solo_query::FactHit` for fields: triple_id, subject_id, predicate, \
583 object_id, object_kind, valid_from_ms, valid_to_ms?, confidence, cluster_id?.",
584 "additionalProperties": true
585 },
586 "ContradictionHit": {
587 "type": "object",
588 "description":
589 "One Steward-flagged contradiction with each side's triple LEFT JOIN'd in. \
590 Returned by GET /memory/contradictions. See `solo_query::ContradictionHit`: \
591 a_id, b_id, kind, explanation, detected_at_ms, a_triple?, b_triple?.",
592 "additionalProperties": true
593 },
594 "ClusterRecord": {
595 "type": "object",
596 "description":
597 "Snapshot of one cluster — its row, optional abstraction, and source episodes \
598 (content truncated to 200 chars unless ?full_content=true). Returned by \
599 GET /memory/clusters/{cluster_id}. See `solo_query::ClusterRecord`.",
600 "additionalProperties": true
601 },
602 "IngestDocumentRequest": {
603 "type": "object",
604 "required": ["path"],
605 "properties": {
606 "path": {
607 "type": "string",
608 "minLength": 1,
609 "description":
610 "Server-side absolute path to the file to ingest. The file must be \
611 readable by the Solo process. Supported formats: plaintext / \
612 markdown / code, HTML, PDF."
613 }
614 },
615 "additionalProperties": false
616 },
617 "IngestReport": {
618 "type": "object",
619 "description":
620 "Returned by POST /memory/documents. Reports the document id assigned, \
621 the number of chunks persisted + embedded, the total byte size, and a \
622 `deduped` flag (true when the same content_hash was already present and \
623 the existing doc_id was returned unchanged). See `solo_storage::IngestReport`.",
624 "required": ["doc_id", "chunks_persisted", "bytes_ingested", "deduped"],
625 "properties": {
626 "doc_id": { "type": "string", "format": "uuid" },
627 "chunks_persisted": { "type": "integer", "minimum": 0 },
628 "bytes_ingested": { "type": "integer", "minimum": 0, "format": "int64" },
629 "deduped": { "type": "boolean" }
630 },
631 "additionalProperties": false
632 },
633 "ForgetDocumentReport": {
634 "type": "object",
635 "description":
636 "Returned by DELETE /memory/documents/{id}. Reports the doc_id soft-deleted \
637 and how many chunk rowids were tombstoned in the HNSW index. The chunk rows \
638 themselves survive in SQL for forensic value. See `solo_storage::ForgetDocumentReport`.",
639 "required": ["doc_id", "chunks_tombstoned"],
640 "properties": {
641 "doc_id": { "type": "string", "format": "uuid" },
642 "chunks_tombstoned": { "type": "integer", "minimum": 0 }
643 },
644 "additionalProperties": false
645 },
646 "SearchDocsRequest": {
647 "type": "object",
648 "required": ["query"],
649 "properties": {
650 "query": { "type": "string", "minLength": 1 },
651 "limit": { "type": "integer", "minimum": 1, "maximum": 100, "default": 5 }
652 },
653 "additionalProperties": false
654 },
655 "DocSearchHit": {
656 "type": "object",
657 "description":
658 "One chunk hit + parent-doc context. Fields per `solo_query::DocSearchHit`: \
659 chunk_id, doc_id, doc_title?, doc_source?, doc_mime_type?, chunk_index, \
660 content, cos_distance, start_offset, end_offset.",
661 "additionalProperties": true
662 },
663 "DocumentInspectResult": {
664 "type": "object",
665 "description":
666 "Returned by GET /memory/documents/{id}. A `document` record (full metadata) \
667 plus an ordered list of chunk summaries (each preview truncated to 200 \
668 chars). See `solo_query::DocumentInspectResult`.",
669 "additionalProperties": true
670 },
671 "DocumentSummary": {
672 "type": "object",
673 "description":
674 "One row from GET /memory/documents. Fields per `solo_query::DocumentSummary`: \
675 doc_id, title?, source?, mime_type?, ingested_at_ms, chunk_count, status.",
676 "additionalProperties": true
677 },
678 "ApiError": {
679 "type": "object",
680 "required": ["error", "status"],
681 "properties": {
682 "error": { "type": "string" },
683 "status": { "type": "integer", "minimum": 400, "maximum": 599 }
684 }
685 }
686 }
687 },
688 "paths": {
689 "/health": {
690 "get": {
691 "summary": "Liveness probe",
692 "description": "Returns plain text `ok`. Always unauthenticated.",
693 "responses": {
694 "200": {
695 "description": "Server is up.",
696 "content": { "text/plain": { "schema": { "type": "string", "example": "ok" } } }
697 }
698 }
699 }
700 },
701 "/openapi.json": {
702 "get": {
703 "summary": "Self-describing OpenAPI 3.1 spec",
704 "description": "Returns this document. Always unauthenticated.",
705 "responses": {
706 "200": {
707 "description": "OpenAPI 3.1 document.",
708 "content": { "application/json": { "schema": { "type": "object" } } }
709 }
710 }
711 }
712 },
713 "/memory": {
714 "post": {
715 "summary": "Remember (store an episode)",
716 "description": "Equivalent to MCP tool `memory_remember`.",
717 "security": [{ "bearerAuth": [] }, {}],
718 "requestBody": {
719 "required": true,
720 "content": { "application/json": { "schema": { "$ref": "#/components/schemas/RememberRequest" } } }
721 },
722 "responses": {
723 "200": {
724 "description": "Memory stored; returns the new MemoryId.",
725 "content": { "application/json": { "schema": { "$ref": "#/components/schemas/RememberResponse" } } }
726 },
727 "400": { "description": "Bad request (e.g. empty content).", "content": { "application/json": { "schema": { "$ref": "#/components/schemas/ApiError" } } } },
728 "401": { "description": "Missing or invalid bearer token (LAN-bound deployments only)." }
729 }
730 }
731 },
732 "/memory/search": {
733 "post": {
734 "summary": "Recall (vector search)",
735 "description": "Equivalent to MCP tool `memory_recall`. Embeds the query, runs HNSW search, returns the top-K hits in cosine-distance order.",
736 "security": [{ "bearerAuth": [] }, {}],
737 "requestBody": {
738 "required": true,
739 "content": { "application/json": { "schema": { "$ref": "#/components/schemas/RecallRequest" } } }
740 },
741 "responses": {
742 "200": {
743 "description": "Search results.",
744 "content": { "application/json": { "schema": { "$ref": "#/components/schemas/RecallResult" } } }
745 },
746 "400": { "description": "Bad request (e.g. empty query).", "content": { "application/json": { "schema": { "$ref": "#/components/schemas/ApiError" } } } },
747 "401": { "description": "Missing or invalid bearer token (LAN-bound deployments only)." }
748 }
749 }
750 },
751 "/memory/consolidate": {
752 "post": {
753 "summary": "Run a consolidation pass (clustering + abstraction)",
754 "description":
755 "Idempotent. Triggers the SWS-equivalent clustering pass; if a `Steward` LLM is wired \
756 on the server, also runs the REM-equivalent abstraction pass that populates \
757 `semantic_abstractions` and `triples`. Empty request body = default scope (unbounded \
758 window). Equivalent to the `solo consolidate` CLI.",
759 "security": [{ "bearerAuth": [] }, {}],
760 "requestBody": {
761 "required": false,
762 "content": { "application/json": { "schema": { "$ref": "#/components/schemas/ConsolidationScope" } } }
763 },
764 "responses": {
765 "200": {
766 "description": "Consolidation complete; report counts the work done.",
767 "content": { "application/json": { "schema": { "$ref": "#/components/schemas/ConsolidationReport" } } }
768 },
769 "401": { "description": "Missing or invalid bearer token (LAN-bound deployments only)." }
770 }
771 }
772 },
773 "/backup": {
774 "post": {
775 "summary": "Online encrypted backup",
776 "description":
777 "Run an online SQLCipher backup of the live data dir to a server-side path. \
778 The destination file is encrypted with the same Argon2id-derived raw key as \
779 the source, so it restores under the same passphrase + a copy of the source's \
780 `solo.config.toml`. Hot — the backup runs against the writer's existing \
781 connection without taking the lockfile, so the daemon keeps serving reads + \
782 writes during the operation. v0.3.2+.",
783 "security": [{ "bearerAuth": [] }, {}],
784 "requestBody": {
785 "required": true,
786 "content": { "application/json": { "schema": {
787 "type": "object",
788 "properties": {
789 "to": { "type": "string", "description": "Server-side absolute path for the backup file." },
790 "force": { "type": "boolean", "description": "Overwrite an existing destination file. Default false.", "default": false }
791 },
792 "required": ["to"]
793 } } }
794 },
795 "responses": {
796 "200": {
797 "description": "Backup complete; reports the destination path + elapsed milliseconds.",
798 "content": { "application/json": { "schema": {
799 "type": "object",
800 "properties": {
801 "path": { "type": "string" },
802 "elapsed_ms": { "type": "integer", "format": "int64" }
803 }
804 } } }
805 },
806 "400": { "description": "Destination invalid, exists without force, or its parent doesn't exist." },
807 "401": { "description": "Missing or invalid bearer token (LAN-bound deployments only)." },
808 "500": { "description": "Backup failed (disk full, permission denied, etc.)." }
809 }
810 }
811 },
812 "/memory/{id}": {
813 "get": {
814 "summary": "Inspect a memory by ID",
815 "description": "Equivalent to MCP tool `memory_inspect`.",
816 "security": [{ "bearerAuth": [] }, {}],
817 "parameters": [{
818 "name": "id",
819 "in": "path",
820 "required": true,
821 "schema": { "type": "string", "format": "uuid" },
822 "description": "MemoryId (UUID v7)."
823 }],
824 "responses": {
825 "200": {
826 "description": "Episode record.",
827 "content": { "application/json": { "schema": { "$ref": "#/components/schemas/EpisodeRecord" } } }
828 },
829 "400": { "description": "Malformed ID.", "content": { "application/json": { "schema": { "$ref": "#/components/schemas/ApiError" } } } },
830 "404": { "description": "No such memory.", "content": { "application/json": { "schema": { "$ref": "#/components/schemas/ApiError" } } } },
831 "401": { "description": "Missing or invalid bearer token (LAN-bound deployments only)." }
832 }
833 },
834 "delete": {
835 "summary": "Forget (soft-delete) a memory by ID",
836 "description":
837 "Equivalent to MCP tool `memory_forget`. Soft-delete: flips `episodes.status = 'forgotten'` \
838 and tombstones the HNSW vector. The row + embedding are preserved for forensics; \
839 re-running `solo reembed` after this does NOT restore visibility.",
840 "security": [{ "bearerAuth": [] }, {}],
841 "parameters": [
842 { "name": "id", "in": "path", "required": true, "schema": { "type": "string", "format": "uuid" } },
843 { "name": "reason", "in": "query", "required": false, "schema": { "type": "string" }, "description": "Free-form reason logged via tracing (not yet persisted to the DB)." }
844 ],
845 "responses": {
846 "204": { "description": "Forgotten (or already forgotten — idempotent)." },
847 "400": { "description": "Malformed ID.", "content": { "application/json": { "schema": { "$ref": "#/components/schemas/ApiError" } } } },
848 "404": { "description": "No such memory.", "content": { "application/json": { "schema": { "$ref": "#/components/schemas/ApiError" } } } },
849 "401": { "description": "Missing or invalid bearer token (LAN-bound deployments only)." }
850 }
851 }
852 },
853 "/memory/themes": {
854 "get": {
855 "summary": "List recent cluster themes",
856 "description":
857 "Equivalent to MCP tool `memory_themes`. List cluster abstractions ordered by \
858 most-recent first. Use to surface 'what has the user been thinking about lately' \
859 without paging through individual episodes. v0.4.0+.",
860 "security": [{ "bearerAuth": [] }, {}],
861 "parameters": [
862 { "name": "window_days", "in": "query", "required": false, "schema": { "type": "integer", "minimum": 1 }, "description": "Optional time window. Omit for unfiltered (all-time, most-recent first)." },
863 { "name": "limit", "in": "query", "required": false, "schema": { "type": "integer", "minimum": 1, "maximum": 100, "default": 5 } }
864 ],
865 "responses": {
866 "200": {
867 "description": "Array of ThemeHits (possibly empty).",
868 "content": { "application/json": { "schema": { "type": "array", "items": { "$ref": "#/components/schemas/ThemeHit" } } } }
869 },
870 "401": { "description": "Missing or invalid bearer token (LAN-bound deployments only)." }
871 }
872 }
873 },
874 "/memory/facts_about": {
875 "get": {
876 "summary": "Query the SPO knowledge graph by subject",
877 "description":
878 "Equivalent to MCP tool `memory_facts_about`. Query Steward-extracted triples by \
879 subject + optional predicate + optional time window. Subject is required \
880 (predicate-only scans not supported). Pass `include_as_object=true` (v0.5.1+) \
881 to also surface rows where `subject` appears as the object. v0.4.0+.",
882 "security": [{ "bearerAuth": [] }, {}],
883 "parameters": [
884 { "name": "subject", "in": "query", "required": true, "schema": { "type": "string", "minLength": 1 }, "description": "Subject id to query (e.g. `Sam`)." },
885 { "name": "predicate", "in": "query", "required": false, "schema": { "type": "string" }, "description": "Optional predicate filter (e.g. `works_at`)." },
886 { "name": "since_ms", "in": "query", "required": false, "schema": { "type": "integer" }, "description": "Optional valid_from_ms lower bound (epoch ms)." },
887 { "name": "until_ms", "in": "query", "required": false, "schema": { "type": "integer" }, "description": "Optional valid_to_ms upper bound (epoch ms). NULL upper bounds (still-valid facts) pass through." },
888 { "name": "include_as_object", "in": "query", "required": false, "schema": { "type": "boolean", "default": false }, "description": "If true, also match rows where `subject` appears as the object (e.g. surface 'Sam pushes back on PRs about Maya' under subject='Maya'). Default false. v0.5.1+." },
889 { "name": "limit", "in": "query", "required": false, "schema": { "type": "integer", "minimum": 1, "maximum": 100, "default": 5 } }
890 ],
891 "responses": {
892 "200": {
893 "description": "Array of FactHits (possibly empty).",
894 "content": { "application/json": { "schema": { "type": "array", "items": { "$ref": "#/components/schemas/FactHit" } } } }
895 },
896 "400": { "description": "Bad request (e.g. empty subject).", "content": { "application/json": { "schema": { "$ref": "#/components/schemas/ApiError" } } } },
897 "401": { "description": "Missing or invalid bearer token (LAN-bound deployments only)." }
898 }
899 }
900 },
901 "/memory/contradictions": {
902 "get": {
903 "summary": "List Steward-flagged contradictions",
904 "description":
905 "Equivalent to MCP tool `memory_contradictions`. Each result includes both \
906 sides' triple SPO via LEFT JOIN for context. v0.4.0+.",
907 "security": [{ "bearerAuth": [] }, {}],
908 "parameters": [
909 { "name": "limit", "in": "query", "required": false, "schema": { "type": "integer", "minimum": 1, "maximum": 100, "default": 5 } }
910 ],
911 "responses": {
912 "200": {
913 "description": "Array of ContradictionHits (possibly empty).",
914 "content": { "application/json": { "schema": { "type": "array", "items": { "$ref": "#/components/schemas/ContradictionHit" } } } }
915 },
916 "401": { "description": "Missing or invalid bearer token (LAN-bound deployments only)." }
917 }
918 }
919 },
920 "/memory/clusters/{cluster_id}": {
921 "get": {
922 "summary": "Inspect a single cluster",
923 "description":
924 "Equivalent to MCP tool `memory_inspect_cluster`. Returns the cluster row, \
925 its (optional) abstraction, and its source episodes. By default each \
926 episode's `content` is truncated to 200 chars with a trailing `…`. Pass \
927 `?full_content=true` to get verbatim episode content. v0.5.0+.",
928 "security": [{ "bearerAuth": [] }, {}],
929 "parameters": [
930 { "name": "cluster_id", "in": "path", "required": true, "schema": { "type": "string", "minLength": 1 }, "description": "Cluster id (from a previous GET /memory/themes response)." },
931 { "name": "full_content", "in": "query", "required": false, "schema": { "type": "boolean", "default": false }, "description": "If true, return episode content verbatim. Default false (truncate to 200 chars + ellipsis)." }
932 ],
933 "responses": {
934 "200": {
935 "description": "Cluster snapshot.",
936 "content": { "application/json": { "schema": { "$ref": "#/components/schemas/ClusterRecord" } } }
937 },
938 "400": { "description": "Bad request (e.g. empty cluster_id).", "content": { "application/json": { "schema": { "$ref": "#/components/schemas/ApiError" } } } },
939 "404": { "description": "No such cluster.", "content": { "application/json": { "schema": { "$ref": "#/components/schemas/ApiError" } } } },
940 "401": { "description": "Missing or invalid bearer token (LAN-bound deployments only)." }
941 }
942 }
943 },
944 "/memory/documents": {
945 "post": {
946 "summary": "Ingest a document",
947 "description":
948 "Equivalent to MCP tool `memory_ingest_document`. Reads the file at the \
949 supplied server-side path, parses + chunks + embeds, and persists under \
950 `documents` + `document_chunks`. Returns the new doc_id, chunk count, and \
951 a `deduped` flag (true when an existing document with the same content_hash \
952 was returned without re-embedding). v0.7.0+.",
953 "security": [{ "bearerAuth": [] }, {}],
954 "requestBody": {
955 "required": true,
956 "content": { "application/json": { "schema": { "$ref": "#/components/schemas/IngestDocumentRequest" } } }
957 },
958 "responses": {
959 "200": {
960 "description": "Document ingested (or deduplicated).",
961 "content": { "application/json": { "schema": { "$ref": "#/components/schemas/IngestReport" } } }
962 },
963 "400": { "description": "Bad request (e.g. empty path, file unreadable, parse error).", "content": { "application/json": { "schema": { "$ref": "#/components/schemas/ApiError" } } } },
964 "401": { "description": "Missing or invalid bearer token (LAN-bound deployments only)." }
965 }
966 },
967 "get": {
968 "summary": "List ingested documents (paginated)",
969 "description":
970 "Equivalent to MCP tool `memory_list_documents`. Returns a paginated index, \
971 newest first. Forgotten documents are hidden by default; pass \
972 `?include_forgotten=true` to see them too. v0.7.0+.",
973 "security": [{ "bearerAuth": [] }, {}],
974 "parameters": [
975 { "name": "limit", "in": "query", "required": false, "schema": { "type": "integer", "minimum": 1, "maximum": 100, "default": 20 } },
976 { "name": "offset", "in": "query", "required": false, "schema": { "type": "integer", "minimum": 0, "default": 0 } },
977 { "name": "include_forgotten", "in": "query", "required": false, "schema": { "type": "boolean", "default": false } }
978 ],
979 "responses": {
980 "200": {
981 "description": "Array of DocumentSummary (possibly empty).",
982 "content": { "application/json": { "schema": { "type": "array", "items": { "$ref": "#/components/schemas/DocumentSummary" } } } }
983 },
984 "401": { "description": "Missing or invalid bearer token (LAN-bound deployments only)." }
985 }
986 }
987 },
988 "/memory/documents/search": {
989 "post": {
990 "summary": "Vector search across document chunks",
991 "description":
992 "Equivalent to MCP tool `memory_search_docs`. Embeds the query and returns \
993 up to `limit` matching chunks, best match first, each annotated with the \
994 parent document's title + source path. Forgotten documents are excluded. \
995 v0.7.0+.",
996 "security": [{ "bearerAuth": [] }, {}],
997 "requestBody": {
998 "required": true,
999 "content": { "application/json": { "schema": { "$ref": "#/components/schemas/SearchDocsRequest" } } }
1000 },
1001 "responses": {
1002 "200": {
1003 "description": "Array of DocSearchHits (possibly empty).",
1004 "content": { "application/json": { "schema": { "type": "array", "items": { "$ref": "#/components/schemas/DocSearchHit" } } } }
1005 },
1006 "400": { "description": "Bad request (e.g. empty query).", "content": { "application/json": { "schema": { "$ref": "#/components/schemas/ApiError" } } } },
1007 "401": { "description": "Missing or invalid bearer token (LAN-bound deployments only)." }
1008 }
1009 }
1010 },
1011 "/memory/documents/{id}": {
1012 "get": {
1013 "summary": "Inspect one document",
1014 "description":
1015 "Equivalent to MCP tool `memory_inspect_document`. Returns the document's \
1016 metadata plus a preview of every chunk (truncated to 200 chars). v0.7.0+.",
1017 "security": [{ "bearerAuth": [] }, {}],
1018 "parameters": [
1019 { "name": "id", "in": "path", "required": true, "schema": { "type": "string", "format": "uuid" }, "description": "DocumentId (UUID v7)." }
1020 ],
1021 "responses": {
1022 "200": {
1023 "description": "Document inspection result.",
1024 "content": { "application/json": { "schema": { "$ref": "#/components/schemas/DocumentInspectResult" } } }
1025 },
1026 "400": { "description": "Malformed id.", "content": { "application/json": { "schema": { "$ref": "#/components/schemas/ApiError" } } } },
1027 "404": { "description": "No such document.", "content": { "application/json": { "schema": { "$ref": "#/components/schemas/ApiError" } } } },
1028 "401": { "description": "Missing or invalid bearer token (LAN-bound deployments only)." }
1029 }
1030 },
1031 "delete": {
1032 "summary": "Forget (soft-delete) one document",
1033 "description":
1034 "Equivalent to MCP tool `memory_forget_document`. Flips `documents.status` \
1035 to `forgotten` and tombstones every chunk's HNSW rowid. The chunk rows \
1036 survive in SQL for forensic value. v0.7.0+.",
1037 "security": [{ "bearerAuth": [] }, {}],
1038 "parameters": [
1039 { "name": "id", "in": "path", "required": true, "schema": { "type": "string", "format": "uuid" } }
1040 ],
1041 "responses": {
1042 "200": {
1043 "description": "Document soft-deleted; report counts chunks tombstoned.",
1044 "content": { "application/json": { "schema": { "$ref": "#/components/schemas/ForgetDocumentReport" } } }
1045 },
1046 "400": { "description": "Malformed id.", "content": { "application/json": { "schema": { "$ref": "#/components/schemas/ApiError" } } } },
1047 "404": { "description": "No such document.", "content": { "application/json": { "schema": { "$ref": "#/components/schemas/ApiError" } } } },
1048 "401": { "description": "Missing or invalid bearer token (LAN-bound deployments only)." }
1049 }
1050 }
1051 }
1052 }
1053 })
1054}
1055
1056#[derive(Debug, Deserialize)]
1061struct RememberBody {
1062 content: String,
1063 #[serde(default)]
1064 source_type: Option<String>,
1065 #[serde(default)]
1066 source_id: Option<String>,
1067}
1068
1069#[derive(Debug, Serialize)]
1070struct RememberResponse {
1071 memory_id: String,
1072}
1073
1074async fn remember_handler(
1075 TenantExtractor(tenant): TenantExtractor,
1076 AuditPrincipal(principal): AuditPrincipal,
1077 Json(body): Json<RememberBody>,
1078) -> Result<Json<RememberResponse>, ApiError> {
1079 let content = body.content.trim_end().to_string();
1080 if content.is_empty() {
1081 return Err(ApiError::bad_request("content must not be empty"));
1082 }
1083 let embedding = tenant.embedder().embed(&content).await.map_err(ApiError::from)?;
1084 let episode = Episode {
1085 memory_id: MemoryId::new(),
1086 ts_ms: chrono::Utc::now().timestamp_millis(),
1087 source_type: body.source_type.unwrap_or_else(|| "user_message".into()),
1088 source_id: body.source_id,
1089 content,
1090 encoding_context: EncodingContext::default(),
1091 provenance: None,
1092 confidence: Confidence::new(0.9).unwrap(),
1093 strength: 0.5,
1094 salience: 0.5,
1095 tier: Tier::Hot,
1096 };
1097 let mid = tenant
1098 .write()
1099 .remember_as(principal, episode, embedding)
1100 .await
1101 .map_err(ApiError::from)?;
1102 Ok(Json(RememberResponse {
1103 memory_id: mid.to_string(),
1104 }))
1105}
1106
1107#[derive(Debug, Deserialize)]
1108struct RecallBody {
1109 query: String,
1110 #[serde(default = "default_limit")]
1111 limit: usize,
1112}
1113
1114fn default_limit() -> usize {
1115 5
1116}
1117
1118async fn recall_handler(
1119 TenantExtractor(tenant): TenantExtractor,
1120 AuditPrincipal(principal): AuditPrincipal,
1121 Json(body): Json<RecallBody>,
1122) -> Result<Json<solo_query::RecallResult>, ApiError> {
1123 let result = solo_query::run_recall(tenant.as_ref(), principal, &body.query, body.limit)
1127 .await
1128 .map_err(ApiError::from)?;
1129 Ok(Json(result))
1130}
1131
1132async fn inspect_handler(
1133 TenantExtractor(tenant): TenantExtractor,
1134 AuditPrincipal(principal): AuditPrincipal,
1135 Path(id): Path<String>,
1136) -> Result<Json<solo_query::EpisodeRecord>, ApiError> {
1137 let mid = MemoryId::from_str(&id)
1138 .map_err(|e| ApiError::bad_request(format!("invalid id: {e}")))?;
1139 let row = solo_query::inspect_one(tenant.read(), tenant.audit(), principal, mid)
1140 .await
1141 .map_err(ApiError::from)?;
1142 Ok(Json(row))
1143}
1144
1145#[derive(Debug, Deserialize)]
1152struct ThemesQuery {
1153 #[serde(default)]
1154 window_days: Option<i64>,
1155 #[serde(default = "default_limit")]
1156 limit: usize,
1157}
1158
1159async fn themes_handler(
1160 TenantExtractor(tenant): TenantExtractor,
1161 AuditPrincipal(principal): AuditPrincipal,
1162 Query(q): Query<ThemesQuery>,
1163) -> Result<Json<Vec<solo_query::ThemeHit>>, ApiError> {
1164 let hits = solo_query::themes(
1165 tenant.read(),
1166 tenant.audit(),
1167 principal,
1168 q.window_days,
1169 q.limit,
1170 )
1171 .await
1172 .map_err(ApiError::from)?;
1173 Ok(Json(hits))
1174}
1175
1176#[derive(Debug, Deserialize)]
1177struct FactsAboutQuery {
1178 subject: String,
1179 #[serde(default)]
1180 predicate: Option<String>,
1181 #[serde(default)]
1182 since_ms: Option<i64>,
1183 #[serde(default)]
1184 until_ms: Option<i64>,
1185 #[serde(default)]
1188 include_as_object: bool,
1189 #[serde(default = "default_limit")]
1190 limit: usize,
1191}
1192
1193async fn facts_about_handler(
1194 State(s): State<SoloHttpState>,
1195 TenantExtractor(tenant): TenantExtractor,
1196 AuditPrincipal(principal): AuditPrincipal,
1197 Query(q): Query<FactsAboutQuery>,
1198) -> Result<Json<Vec<solo_query::FactHit>>, ApiError> {
1199 if q.subject.trim().is_empty() {
1200 return Err(ApiError::bad_request("subject must not be empty"));
1201 }
1202 let hits = solo_query::facts_about(
1203 tenant.read(),
1204 tenant.audit(),
1205 principal,
1206 &q.subject,
1207 &s.user_aliases,
1208 q.include_as_object,
1209 q.predicate.as_deref(),
1210 q.since_ms,
1211 q.until_ms,
1212 q.limit,
1213 )
1214 .await
1215 .map_err(ApiError::from)?;
1216 Ok(Json(hits))
1217}
1218
1219#[derive(Debug, Deserialize)]
1220struct ContradictionsQuery {
1221 #[serde(default = "default_limit")]
1222 limit: usize,
1223}
1224
1225async fn contradictions_handler(
1226 TenantExtractor(tenant): TenantExtractor,
1227 AuditPrincipal(principal): AuditPrincipal,
1228 Query(q): Query<ContradictionsQuery>,
1229) -> Result<Json<Vec<solo_query::ContradictionHit>>, ApiError> {
1230 let hits = solo_query::contradictions(tenant.read(), tenant.audit(), principal, q.limit)
1231 .await
1232 .map_err(ApiError::from)?;
1233 Ok(Json(hits))
1234}
1235
1236#[derive(Debug, Deserialize, Default)]
1237struct InspectClusterQuery {
1238 #[serde(default)]
1242 full_content: bool,
1243}
1244
1245async fn inspect_cluster_handler(
1246 TenantExtractor(tenant): TenantExtractor,
1247 AuditPrincipal(principal): AuditPrincipal,
1248 Path(cluster_id): Path<String>,
1249 Query(q): Query<InspectClusterQuery>,
1250) -> Result<Json<solo_query::ClusterRecord>, ApiError> {
1251 if cluster_id.trim().is_empty() {
1252 return Err(ApiError::bad_request("cluster_id must not be empty"));
1253 }
1254 let record = solo_query::inspect_cluster(
1255 tenant.read(),
1256 tenant.audit(),
1257 principal,
1258 &cluster_id,
1259 q.full_content,
1260 )
1261 .await
1262 .map_err(ApiError::from)?;
1263 Ok(Json(record))
1264}
1265
1266#[derive(Debug, Deserialize)]
1271struct IngestDocumentBody {
1272 path: String,
1275}
1276
1277async fn ingest_document_handler(
1278 TenantExtractor(tenant): TenantExtractor,
1279 AuditPrincipal(principal): AuditPrincipal,
1280 Json(body): Json<IngestDocumentBody>,
1281) -> Result<Json<solo_storage::IngestReport>, ApiError> {
1282 if body.path.trim().is_empty() {
1283 return Err(ApiError::bad_request("path must not be empty"));
1284 }
1285 let path = std::path::PathBuf::from(body.path);
1286 let chunk_config = solo_storage::document::ChunkConfig::default();
1287 let report = tenant
1288 .write()
1289 .ingest_document_as(principal, path, chunk_config)
1290 .await
1291 .map_err(ApiError::from)?;
1292 Ok(Json(report))
1293}
1294
1295#[derive(Debug, Deserialize)]
1296struct SearchDocsBody {
1297 query: String,
1298 #[serde(default = "default_limit")]
1299 limit: usize,
1300}
1301
1302async fn search_docs_handler(
1303 TenantExtractor(tenant): TenantExtractor,
1304 AuditPrincipal(principal): AuditPrincipal,
1305 Json(body): Json<SearchDocsBody>,
1306) -> Result<Json<Vec<solo_query::DocSearchHit>>, ApiError> {
1307 let hits = solo_query::run_doc_search(tenant.as_ref(), principal, &body.query, body.limit)
1308 .await
1309 .map_err(ApiError::from)?;
1310 Ok(Json(hits))
1311}
1312
1313async fn inspect_document_handler(
1314 TenantExtractor(tenant): TenantExtractor,
1315 AuditPrincipal(principal): AuditPrincipal,
1316 Path(id): Path<String>,
1317) -> Result<Json<solo_query::DocumentInspectResult>, ApiError> {
1318 let doc_id = DocumentId::from_str(&id)
1319 .map_err(|e| ApiError::bad_request(format!("invalid id: {e}")))?;
1320 let result_opt =
1321 solo_query::inspect_document(tenant.read(), tenant.audit(), principal, &doc_id)
1322 .await
1323 .map_err(ApiError::from)?;
1324 match result_opt {
1325 Some(record) => Ok(Json(record)),
1326 None => Err(ApiError::not_found(format!("document {doc_id} not found"))),
1327 }
1328}
1329
1330#[derive(Debug, Deserialize)]
1331struct ListDocumentsQuery {
1332 #[serde(default = "default_list_documents_limit")]
1333 limit: usize,
1334 #[serde(default)]
1335 offset: usize,
1336 #[serde(default)]
1337 include_forgotten: bool,
1338}
1339
1340fn default_list_documents_limit() -> usize {
1341 20
1342}
1343
1344async fn list_documents_handler(
1345 TenantExtractor(tenant): TenantExtractor,
1346 AuditPrincipal(principal): AuditPrincipal,
1347 Query(q): Query<ListDocumentsQuery>,
1348) -> Result<Json<Vec<solo_query::DocumentSummary>>, ApiError> {
1349 let rows = solo_query::list_documents(
1350 tenant.read(),
1351 tenant.audit(),
1352 principal,
1353 q.limit,
1354 q.offset,
1355 q.include_forgotten,
1356 )
1357 .await
1358 .map_err(ApiError::from)?;
1359 Ok(Json(rows))
1360}
1361
1362async fn forget_document_handler(
1363 TenantExtractor(tenant): TenantExtractor,
1364 AuditPrincipal(principal): AuditPrincipal,
1365 Path(id): Path<String>,
1366) -> Result<Json<solo_storage::ForgetDocumentReport>, ApiError> {
1367 let doc_id = DocumentId::from_str(&id)
1368 .map_err(|e| ApiError::bad_request(format!("invalid id: {e}")))?;
1369 let report = tenant
1370 .write()
1371 .forget_document_as(principal, doc_id)
1372 .await
1373 .map_err(ApiError::from)?;
1374 Ok(Json(report))
1375}
1376
1377#[derive(Debug, Deserialize)]
1378struct ForgetQuery {
1379 #[serde(default)]
1380 reason: Option<String>,
1381}
1382
1383async fn forget_handler(
1384 TenantExtractor(tenant): TenantExtractor,
1385 AuditPrincipal(principal): AuditPrincipal,
1386 Path(id): Path<String>,
1387 Query(q): Query<ForgetQuery>,
1388) -> Result<StatusCode, ApiError> {
1389 let mid = MemoryId::from_str(&id).map_err(|e| ApiError::bad_request(format!("invalid id: {e}")))?;
1390 let reason = q.reason.unwrap_or_else(|| "http".into());
1391 tenant
1392 .write()
1393 .forget_as(principal, mid, reason)
1394 .await
1395 .map_err(ApiError::from)?;
1396 Ok(StatusCode::NO_CONTENT)
1397}
1398
1399async fn consolidate_handler(
1400 TenantExtractor(tenant): TenantExtractor,
1401 AuditPrincipal(principal): AuditPrincipal,
1402 body: axum::body::Bytes,
1403) -> Result<Json<solo_storage::ConsolidationReport>, ApiError> {
1404 let scope = if body.is_empty() {
1410 solo_storage::ConsolidationScope::default()
1411 } else {
1412 serde_json::from_slice(&body)
1413 .map_err(|e| ApiError::bad_request(format!("invalid JSON: {e}")))?
1414 };
1415 let report = tenant
1416 .write()
1417 .consolidate_as(principal, scope)
1418 .await
1419 .map_err(ApiError::from)?;
1420 Ok(Json(report))
1421}
1422
1423#[derive(Debug, Deserialize)]
1424struct BackupBody {
1425 to: String,
1429 #[serde(default)]
1430 force: bool,
1431}
1432
1433#[derive(Debug, Serialize)]
1434struct BackupResponse {
1435 path: String,
1436 elapsed_ms: u64,
1437}
1438
1439async fn backup_handler(
1440 TenantExtractor(tenant): TenantExtractor,
1441 Json(body): Json<BackupBody>,
1442) -> Result<Json<BackupResponse>, ApiError> {
1443 use std::path::PathBuf;
1444
1445 let dest = PathBuf::from(&body.to);
1446 if dest.as_os_str().is_empty() {
1447 return Err(ApiError::bad_request("`to` must not be empty"));
1448 }
1449 if solo_storage::paths_refer_to_same_file(tenant.db_path(), &dest) {
1452 return Err(ApiError::bad_request(format!(
1453 "destination {} is the same file as the source database; \
1454 refusing to run (would corrupt the live database)",
1455 dest.display()
1456 )));
1457 }
1458 if dest.exists() {
1459 if !body.force {
1460 return Err(ApiError::bad_request(format!(
1461 "destination {} exists; pass force=true to overwrite",
1462 dest.display()
1463 )));
1464 }
1465 std::fs::remove_file(&dest).map_err(|e| {
1466 ApiError::internal(format!(
1467 "remove existing destination {}: {e}",
1468 dest.display()
1469 ))
1470 })?;
1471 }
1472 if let Some(parent) = dest.parent() {
1473 if !parent.as_os_str().is_empty() && !parent.is_dir() {
1474 return Err(ApiError::bad_request(format!(
1475 "destination parent directory {} does not exist",
1476 parent.display()
1477 )));
1478 }
1479 }
1480
1481 let started = std::time::Instant::now();
1482 tenant.write().backup(dest.clone()).await.map_err(ApiError::from)?;
1483 let elapsed_ms = started.elapsed().as_millis() as u64;
1484
1485 Ok(Json(BackupResponse {
1486 path: dest.display().to_string(),
1487 elapsed_ms,
1488 }))
1489}
1490
1491const GRAPH_EXPAND_DEFAULT_LIMIT: u32 = 25;
1530const GRAPH_EXPAND_MAX_LIMIT: u32 = 100;
1531
1532#[derive(Debug, Clone, Copy, Deserialize)]
1535#[serde(rename_all = "snake_case")]
1536enum GraphExpandKind {
1537 ClusterMember,
1538 DocumentChunk,
1539 Triple,
1540 Semantic,
1541}
1542
1543#[derive(Debug, Deserialize)]
1544struct GraphExpandQuery {
1545 node_id: String,
1546 kind: GraphExpandKind,
1547 #[serde(default)]
1548 limit: Option<u32>,
1549}
1550
1551#[derive(Debug, Clone, Copy, PartialEq, Eq)]
1553enum NodeKind {
1554 Episode,
1555 Document,
1556 Chunk,
1557 Cluster,
1558 Entity,
1559}
1560
1561impl NodeKind {
1562 fn as_wire_str(self) -> &'static str {
1563 match self {
1564 Self::Episode => "episode",
1565 Self::Document => "document",
1566 Self::Chunk => "chunk",
1567 Self::Cluster => "cluster",
1568 Self::Entity => "entity",
1569 }
1570 }
1571}
1572
1573fn parse_node_id(raw: &str) -> Result<(NodeKind, &str), ApiError> {
1576 let (prefix, value) = raw.split_once(':').ok_or_else(|| {
1577 ApiError::bad_request(format!(
1578 "node_id must be `<prefix>:<value>` (one of ep:/doc:/chunk:/cl:/ent:); got {raw:?}"
1579 ))
1580 })?;
1581 if value.is_empty() {
1582 return Err(ApiError::bad_request(format!(
1583 "node_id value is empty after prefix: {raw:?}"
1584 )));
1585 }
1586 let kind = match prefix {
1587 "ep" => NodeKind::Episode,
1588 "doc" => NodeKind::Document,
1589 "chunk" => NodeKind::Chunk,
1590 "cl" => NodeKind::Cluster,
1591 "ent" => NodeKind::Entity,
1592 other => {
1593 return Err(ApiError::bad_request(format!(
1594 "unknown node_id prefix {other:?}; expected one of ep:/doc:/chunk:/cl:/ent:"
1595 )));
1596 }
1597 };
1598 Ok((kind, value))
1599}
1600
1601#[derive(Debug, Serialize)]
1604struct GraphNode {
1605 id: String,
1606 kind: &'static str,
1607 label: String,
1608 #[serde(skip_serializing_if = "Option::is_none")]
1609 ts_ms: Option<i64>,
1610 tenant_id: String,
1611 #[serde(skip_serializing_if = "Option::is_none")]
1612 preview: Option<String>,
1613}
1614
1615#[derive(Debug, Serialize)]
1618struct GraphEdge {
1619 id: String,
1620 source: String,
1621 target: String,
1622 kind: &'static str,
1623 #[serde(skip_serializing_if = "Option::is_none")]
1624 predicate: Option<String>,
1625 #[serde(skip_serializing_if = "Option::is_none")]
1626 weight: Option<f32>,
1627}
1628
1629#[derive(Debug, Serialize)]
1630struct GraphExpandResponse {
1631 nodes: Vec<GraphNode>,
1632 edges: Vec<GraphEdge>,
1633}
1634
1635fn edge_id(source: &str, kind: &str, target: &str) -> String {
1636 format!("{source}--{kind}--{target}")
1637}
1638
1639#[derive(Debug)]
1641struct ExpandedEpisode {
1642 memory_id: String,
1643 ts_ms: i64,
1644 content: String,
1645}
1646
1647#[derive(Debug)]
1649struct ExpandedDocument {
1650 doc_id: String,
1651 title: Option<String>,
1652 source: Option<String>,
1653 ingested_at_ms: i64,
1654}
1655
1656#[derive(Debug)]
1658struct ExpandedChunk {
1659 chunk_id: String,
1660 chunk_index: i64,
1661 content: String,
1662}
1663
1664fn truncate_preview(s: &str, max: usize) -> String {
1665 if s.chars().count() <= max {
1666 return s.to_string();
1667 }
1668 let mut out: String = s.chars().take(max - 1).collect();
1669 out.push('…');
1670 out
1671}
1672
1673const GRAPH_LABEL_CHARS: usize = 80;
1676const GRAPH_PREVIEW_CHARS: usize = 200;
1677
1678fn episode_label(content: &str) -> String {
1679 let first_line = content.lines().next().unwrap_or(content);
1680 truncate_preview(first_line, GRAPH_LABEL_CHARS)
1681}
1682
1683fn graph_node_for_episode(tenant_id: &str, ep: &ExpandedEpisode) -> GraphNode {
1684 GraphNode {
1685 id: format!("ep:{}", ep.memory_id),
1686 kind: NodeKind::Episode.as_wire_str(),
1687 label: episode_label(&ep.content),
1688 ts_ms: Some(ep.ts_ms),
1689 tenant_id: tenant_id.to_string(),
1690 preview: Some(truncate_preview(&ep.content, GRAPH_PREVIEW_CHARS)),
1691 }
1692}
1693
1694fn graph_node_for_document(tenant_id: &str, d: &ExpandedDocument) -> GraphNode {
1695 let label = d
1696 .title
1697 .clone()
1698 .or_else(|| d.source.clone())
1699 .unwrap_or_else(|| d.doc_id.clone());
1700 GraphNode {
1701 id: format!("doc:{}", d.doc_id),
1702 kind: NodeKind::Document.as_wire_str(),
1703 label: truncate_preview(&label, GRAPH_LABEL_CHARS),
1704 ts_ms: Some(d.ingested_at_ms),
1705 tenant_id: tenant_id.to_string(),
1706 preview: d.source.clone(),
1707 }
1708}
1709
1710fn graph_node_for_chunk(tenant_id: &str, c: &ExpandedChunk) -> GraphNode {
1711 GraphNode {
1712 id: format!("chunk:{}", c.chunk_id),
1713 kind: NodeKind::Chunk.as_wire_str(),
1714 label: format!("chunk #{}: {}", c.chunk_index, episode_label(&c.content)),
1715 ts_ms: None,
1716 tenant_id: tenant_id.to_string(),
1717 preview: Some(truncate_preview(&c.content, GRAPH_PREVIEW_CHARS)),
1718 }
1719}
1720
1721fn graph_node_for_cluster(
1722 tenant_id: &str,
1723 cluster_id: &str,
1724 abstraction: Option<&str>,
1725 created_at_ms: i64,
1726) -> GraphNode {
1727 let label = abstraction
1728 .map(|a| truncate_preview(a, GRAPH_LABEL_CHARS))
1729 .unwrap_or_else(|| format!("cluster {cluster_id}"));
1730 GraphNode {
1731 id: format!("cl:{cluster_id}"),
1732 kind: NodeKind::Cluster.as_wire_str(),
1733 label,
1734 ts_ms: Some(created_at_ms),
1735 tenant_id: tenant_id.to_string(),
1736 preview: abstraction.map(|a| truncate_preview(a, GRAPH_PREVIEW_CHARS)),
1737 }
1738}
1739
1740fn graph_node_for_entity(tenant_id: &str, value: &str) -> GraphNode {
1741 GraphNode {
1742 id: format!("ent:{value}"),
1743 kind: NodeKind::Entity.as_wire_str(),
1744 label: truncate_preview(value, GRAPH_LABEL_CHARS),
1745 ts_ms: None,
1746 tenant_id: tenant_id.to_string(),
1747 preview: None,
1748 }
1749}
1750
1751async fn graph_expand_handler(
1753 TenantExtractor(tenant): TenantExtractor,
1754 Query(q): Query<GraphExpandQuery>,
1755) -> Result<Json<GraphExpandResponse>, ApiError> {
1756 let limit = q.limit.unwrap_or(GRAPH_EXPAND_DEFAULT_LIMIT);
1760 let limit = limit.clamp(1, GRAPH_EXPAND_MAX_LIMIT) as i64;
1761
1762 let (node_kind, value) = parse_node_id(&q.node_id)?;
1763 let value = value.to_string();
1764 let node_id_full = q.node_id.clone();
1765 let tenant_id_str = tenant.tenant_id().to_string();
1766
1767 match q.kind {
1768 GraphExpandKind::ClusterMember => {
1769 expand_cluster_member(&tenant, &tenant_id_str, node_kind, &value, &node_id_full, limit)
1770 .await
1771 }
1772 GraphExpandKind::DocumentChunk => {
1773 expand_document_chunk(&tenant, &tenant_id_str, node_kind, &value, &node_id_full, limit)
1774 .await
1775 }
1776 GraphExpandKind::Triple => {
1777 expand_triple(&tenant, &tenant_id_str, node_kind, &value, &node_id_full, limit).await
1778 }
1779 GraphExpandKind::Semantic => {
1780 expand_semantic(&tenant, &tenant_id_str, node_kind, &value, &node_id_full, limit).await
1781 }
1782 }
1783 .map(Json)
1784}
1785
1786async fn expand_cluster_member(
1789 tenant: &TenantHandle,
1790 tenant_id: &str,
1791 node_kind: NodeKind,
1792 value: &str,
1793 node_id_full: &str,
1794 limit: i64,
1795) -> Result<GraphExpandResponse, ApiError> {
1796 match node_kind {
1797 NodeKind::Episode => expand_cluster_member_from_episode(
1798 tenant,
1799 tenant_id,
1800 value.to_string(),
1801 node_id_full.to_string(),
1802 limit,
1803 )
1804 .await,
1805 NodeKind::Cluster => expand_cluster_member_from_cluster(
1806 tenant,
1807 tenant_id,
1808 value.to_string(),
1809 node_id_full.to_string(),
1810 limit,
1811 )
1812 .await,
1813 _ => Err(ApiError::bad_request(format!(
1814 "kind=cluster_member only valid for episode or cluster source nodes; got {}",
1815 node_kind.as_wire_str()
1816 ))),
1817 }
1818}
1819
1820async fn expand_cluster_member_from_episode(
1821 tenant: &TenantHandle,
1822 tenant_id: &str,
1823 memory_id: String,
1824 node_id_full: String,
1825 limit: i64,
1826) -> Result<GraphExpandResponse, ApiError> {
1827 let memory_id_for_err = memory_id.clone();
1828 let rows: Vec<(String, Option<String>, i64)> = tenant
1829 .read()
1830 .interact(move |conn| {
1831 let exists: i64 = conn.query_row(
1833 "SELECT COUNT(*) FROM episodes WHERE memory_id = ?1",
1834 rusqlite::params![&memory_id],
1835 |r| r.get(0),
1836 )?;
1837 if exists == 0 {
1838 return Ok(Vec::new());
1839 }
1840 let mut stmt = conn.prepare(
1841 "SELECT c.cluster_id, sa.content, c.created_at_ms
1842 FROM cluster_episodes ce
1843 JOIN clusters c ON c.cluster_id = ce.cluster_id
1844 LEFT JOIN semantic_abstractions sa ON sa.cluster_id = c.cluster_id
1845 WHERE ce.memory_id = ?1
1846 ORDER BY c.created_at_ms DESC
1847 LIMIT ?2",
1848 )?;
1849 let mapped = stmt
1850 .query_map(rusqlite::params![&memory_id, limit], |r| {
1851 Ok((
1852 r.get::<_, String>(0)?,
1853 r.get::<_, Option<String>>(1)?,
1854 r.get::<_, i64>(2)?,
1855 ))
1856 })?
1857 .collect::<rusqlite::Result<Vec<_>>>()?;
1858 Ok::<_, rusqlite::Error>(mapped)
1865 })
1866 .await
1867 .map_err(ApiError::from)?;
1868
1869 if rows.is_empty() {
1876 ensure_episode_exists(tenant, &memory_id_for_err, &node_id_full).await?;
1877 return Ok(GraphExpandResponse {
1878 nodes: Vec::new(),
1879 edges: Vec::new(),
1880 });
1881 }
1882
1883 let mut nodes = Vec::with_capacity(rows.len());
1884 let mut edges = Vec::with_capacity(rows.len());
1885 for (cluster_id, abstraction, created_at_ms) in rows {
1886 let target_id = format!("cl:{cluster_id}");
1887 edges.push(GraphEdge {
1888 id: edge_id(&node_id_full, "cluster_member", &target_id),
1889 source: node_id_full.clone(),
1890 target: target_id,
1891 kind: "cluster_member",
1892 predicate: None,
1893 weight: None,
1894 });
1895 nodes.push(graph_node_for_cluster(
1896 tenant_id,
1897 &cluster_id,
1898 abstraction.as_deref(),
1899 created_at_ms,
1900 ));
1901 }
1902 Ok(GraphExpandResponse { nodes, edges })
1903}
1904
1905async fn expand_cluster_member_from_cluster(
1906 tenant: &TenantHandle,
1907 tenant_id: &str,
1908 cluster_id: String,
1909 node_id_full: String,
1910 limit: i64,
1911) -> Result<GraphExpandResponse, ApiError> {
1912 let cluster_id_for_err = cluster_id.clone();
1913 let rows: Vec<ExpandedEpisode> = tenant
1914 .read()
1915 .interact(move |conn| {
1916 let exists: i64 = conn.query_row(
1917 "SELECT COUNT(*) FROM clusters WHERE cluster_id = ?1",
1918 rusqlite::params![&cluster_id],
1919 |r| r.get(0),
1920 )?;
1921 if exists == 0 {
1922 return Ok(Vec::new());
1923 }
1924 let mut stmt = conn.prepare(
1925 "SELECT e.memory_id, e.ts_ms, e.content
1926 FROM cluster_episodes ce
1927 JOIN episodes e ON e.memory_id = ce.memory_id
1928 WHERE ce.cluster_id = ?1
1929 AND e.status = 'active'
1930 ORDER BY e.ts_ms DESC
1931 LIMIT ?2",
1932 )?;
1933 let mapped = stmt
1934 .query_map(rusqlite::params![&cluster_id, limit], |r| {
1935 Ok(ExpandedEpisode {
1936 memory_id: r.get(0)?,
1937 ts_ms: r.get(1)?,
1938 content: r.get(2)?,
1939 })
1940 })?
1941 .collect::<rusqlite::Result<Vec<_>>>()?;
1942 Ok::<_, rusqlite::Error>(mapped)
1943 })
1944 .await
1945 .map_err(ApiError::from)?;
1946
1947 if rows.is_empty() {
1948 ensure_cluster_exists(tenant, &cluster_id_for_err, &node_id_full).await?;
1949 return Ok(GraphExpandResponse {
1950 nodes: Vec::new(),
1951 edges: Vec::new(),
1952 });
1953 }
1954
1955 let mut nodes = Vec::with_capacity(rows.len());
1956 let mut edges = Vec::with_capacity(rows.len());
1957 for ep in rows {
1958 let target_id = format!("ep:{}", ep.memory_id);
1959 edges.push(GraphEdge {
1960 id: edge_id(&node_id_full, "cluster_member", &target_id),
1961 source: node_id_full.clone(),
1962 target: target_id,
1963 kind: "cluster_member",
1964 predicate: None,
1965 weight: None,
1966 });
1967 nodes.push(graph_node_for_episode(tenant_id, &ep));
1968 }
1969 Ok(GraphExpandResponse { nodes, edges })
1970}
1971
1972async fn expand_document_chunk(
1975 tenant: &TenantHandle,
1976 tenant_id: &str,
1977 node_kind: NodeKind,
1978 value: &str,
1979 node_id_full: &str,
1980 limit: i64,
1981) -> Result<GraphExpandResponse, ApiError> {
1982 match node_kind {
1983 NodeKind::Document => expand_document_chunk_from_document(
1984 tenant,
1985 tenant_id,
1986 value.to_string(),
1987 node_id_full.to_string(),
1988 limit,
1989 )
1990 .await,
1991 NodeKind::Chunk => expand_document_chunk_from_chunk(
1992 tenant,
1993 tenant_id,
1994 value.to_string(),
1995 node_id_full.to_string(),
1996 )
1997 .await,
1998 _ => Err(ApiError::bad_request(format!(
1999 "kind=document_chunk only valid for document or chunk source nodes; got {}",
2000 node_kind.as_wire_str()
2001 ))),
2002 }
2003}
2004
2005async fn expand_document_chunk_from_document(
2006 tenant: &TenantHandle,
2007 tenant_id: &str,
2008 doc_id: String,
2009 node_id_full: String,
2010 limit: i64,
2011) -> Result<GraphExpandResponse, ApiError> {
2012 let doc_id_for_err = doc_id.clone();
2013 let rows: Vec<ExpandedChunk> = tenant
2014 .read()
2015 .interact(move |conn| {
2016 let exists: i64 = conn.query_row(
2017 "SELECT COUNT(*) FROM documents WHERE doc_id = ?1",
2018 rusqlite::params![&doc_id],
2019 |r| r.get(0),
2020 )?;
2021 if exists == 0 {
2022 return Ok(Vec::new());
2023 }
2024 let mut stmt = conn.prepare(
2025 "SELECT chunk_id, chunk_index, content
2026 FROM document_chunks
2027 WHERE doc_id = ?1
2028 ORDER BY chunk_index ASC
2029 LIMIT ?2",
2030 )?;
2031 let mapped = stmt
2032 .query_map(rusqlite::params![&doc_id, limit], |r| {
2033 Ok(ExpandedChunk {
2034 chunk_id: r.get(0)?,
2035 chunk_index: r.get(1)?,
2036 content: r.get(2)?,
2037 })
2038 })?
2039 .collect::<rusqlite::Result<Vec<_>>>()?;
2040 Ok::<_, rusqlite::Error>(mapped)
2041 })
2042 .await
2043 .map_err(ApiError::from)?;
2044
2045 if rows.is_empty() {
2046 ensure_document_exists(tenant, &doc_id_for_err, &node_id_full).await?;
2047 return Ok(GraphExpandResponse {
2048 nodes: Vec::new(),
2049 edges: Vec::new(),
2050 });
2051 }
2052
2053 let mut nodes = Vec::with_capacity(rows.len());
2054 let mut edges = Vec::with_capacity(rows.len());
2055 for c in rows {
2056 let target_id = format!("chunk:{}", c.chunk_id);
2057 edges.push(GraphEdge {
2058 id: edge_id(&node_id_full, "document_chunk", &target_id),
2059 source: node_id_full.clone(),
2060 target: target_id,
2061 kind: "document_chunk",
2062 predicate: None,
2063 weight: None,
2064 });
2065 nodes.push(graph_node_for_chunk(tenant_id, &c));
2066 }
2067 Ok(GraphExpandResponse { nodes, edges })
2068}
2069
2070async fn expand_document_chunk_from_chunk(
2071 tenant: &TenantHandle,
2072 tenant_id: &str,
2073 chunk_id: String,
2074 node_id_full: String,
2075) -> Result<GraphExpandResponse, ApiError> {
2076 let chunk_id_for_err = chunk_id.clone();
2077 let row: Option<ExpandedDocument> = tenant
2078 .read()
2079 .interact(move |conn| {
2080 conn.query_row(
2081 "SELECT d.doc_id, d.title, d.source, d.ingested_at_ms
2082 FROM document_chunks c
2083 JOIN documents d ON d.doc_id = c.doc_id
2084 WHERE c.chunk_id = ?1",
2085 rusqlite::params![&chunk_id],
2086 |r| {
2087 Ok(ExpandedDocument {
2088 doc_id: r.get(0)?,
2089 title: r.get(1)?,
2090 source: r.get(2)?,
2091 ingested_at_ms: r.get(3)?,
2092 })
2093 },
2094 )
2095 .map(Some)
2096 .or_else(|e| match e {
2097 rusqlite::Error::QueryReturnedNoRows => Ok(None),
2098 other => Err(other),
2099 })
2100 })
2101 .await
2102 .map_err(ApiError::from)?;
2103
2104 let d = row.ok_or_else(|| {
2105 ApiError::not_found(format!(
2106 "node_id {node_id_full:?} (chunk_id {chunk_id_for_err}) not found in current tenant"
2107 ))
2108 })?;
2109 let target_id = format!("doc:{}", d.doc_id);
2110 let edge = GraphEdge {
2111 id: edge_id(&node_id_full, "document_chunk", &target_id),
2112 source: node_id_full.clone(),
2113 target: target_id,
2114 kind: "document_chunk",
2115 predicate: None,
2116 weight: None,
2117 };
2118 let node = graph_node_for_document(tenant_id, &d);
2119 Ok(GraphExpandResponse {
2120 nodes: vec![node],
2121 edges: vec![edge],
2122 })
2123}
2124
2125async fn expand_triple(
2128 tenant: &TenantHandle,
2129 tenant_id: &str,
2130 node_kind: NodeKind,
2131 value: &str,
2132 node_id_full: &str,
2133 limit: i64,
2134) -> Result<GraphExpandResponse, ApiError> {
2135 match node_kind {
2136 NodeKind::Episode => expand_triple_from_episode(
2137 tenant,
2138 tenant_id,
2139 value.to_string(),
2140 node_id_full.to_string(),
2141 limit,
2142 )
2143 .await,
2144 NodeKind::Entity => expand_triple_from_entity(
2145 tenant,
2146 tenant_id,
2147 value.to_string(),
2148 node_id_full.to_string(),
2149 limit,
2150 )
2151 .await,
2152 _ => Err(ApiError::bad_request(format!(
2153 "kind=triple only valid for episode or entity source nodes; got {}",
2154 node_kind.as_wire_str()
2155 ))),
2156 }
2157}
2158
2159#[derive(Debug)]
2160struct TripleRow {
2161 subject_id: String,
2162 predicate: String,
2163 object_id: String,
2164 confidence: f32,
2165}
2166
2167async fn expand_triple_from_episode(
2168 tenant: &TenantHandle,
2169 tenant_id: &str,
2170 memory_id: String,
2171 node_id_full: String,
2172 limit: i64,
2173) -> Result<GraphExpandResponse, ApiError> {
2174 let memory_id_for_err = memory_id.clone();
2175 let rows: Vec<TripleRow> = tenant
2176 .read()
2177 .interact(move |conn| {
2178 let rowid_opt: Option<i64> = conn
2180 .query_row(
2181 "SELECT rowid FROM episodes WHERE memory_id = ?1",
2182 rusqlite::params![&memory_id],
2183 |r| r.get(0),
2184 )
2185 .map(Some)
2186 .or_else(|e| match e {
2187 rusqlite::Error::QueryReturnedNoRows => Ok(None),
2188 other => Err(other),
2189 })?;
2190 let Some(rowid) = rowid_opt else {
2191 return Ok(Vec::new());
2192 };
2193 let mut stmt = conn.prepare(
2194 "SELECT subject_id, predicate, object_id, confidence
2195 FROM triples
2196 WHERE source_episode_id = ?1
2197 AND status = 'active'
2198 ORDER BY valid_from_ms DESC
2199 LIMIT ?2",
2200 )?;
2201 let mapped = stmt
2202 .query_map(rusqlite::params![rowid, limit], |r| {
2203 Ok(TripleRow {
2204 subject_id: r.get(0)?,
2205 predicate: r.get(1)?,
2206 object_id: r.get(2)?,
2207 confidence: r.get(3)?,
2208 })
2209 })?
2210 .collect::<rusqlite::Result<Vec<_>>>()?;
2211 Ok::<_, rusqlite::Error>(mapped)
2212 })
2213 .await
2214 .map_err(ApiError::from)?;
2215
2216 if rows.is_empty() {
2217 ensure_episode_exists(tenant, &memory_id_for_err, &node_id_full).await?;
2218 return Ok(GraphExpandResponse {
2219 nodes: Vec::new(),
2220 edges: Vec::new(),
2221 });
2222 }
2223
2224 let mut nodes = Vec::new();
2225 let mut edges = Vec::new();
2226 let mut seen_entities: std::collections::HashSet<String> = Default::default();
2227 for t in rows {
2228 let subj_id = format!("ent:{}", t.subject_id);
2239 let obj_id = format!("ent:{}", t.object_id);
2240 if seen_entities.insert(t.subject_id.clone()) {
2241 nodes.push(graph_node_for_entity(tenant_id, &t.subject_id));
2242 }
2243 if seen_entities.insert(t.object_id.clone()) {
2244 nodes.push(graph_node_for_entity(tenant_id, &t.object_id));
2245 }
2246 edges.push(GraphEdge {
2247 id: edge_id(&subj_id, "triple", &obj_id),
2248 source: subj_id,
2249 target: obj_id,
2250 kind: "triple",
2251 predicate: Some(t.predicate),
2252 weight: Some(t.confidence),
2253 });
2254 }
2255 Ok(GraphExpandResponse { nodes, edges })
2256}
2257
2258async fn expand_triple_from_entity(
2259 tenant: &TenantHandle,
2260 tenant_id: &str,
2261 entity_value: String,
2262 node_id_full: String,
2263 limit: i64,
2264) -> Result<GraphExpandResponse, ApiError> {
2265 let entity_q = entity_value.clone();
2268 let rows: Vec<ExpandedEpisode> = tenant
2269 .read()
2270 .interact(move |conn| {
2271 let mut stmt = conn.prepare(
2274 "SELECT DISTINCT e.memory_id, e.ts_ms, e.content
2275 FROM triples t
2276 JOIN episodes e ON e.rowid = t.source_episode_id
2277 WHERE (t.subject_id = ?1 OR t.object_id = ?1)
2278 AND t.status = 'active'
2279 AND t.source_episode_id IS NOT NULL
2280 AND e.status = 'active'
2281 ORDER BY e.ts_ms DESC
2282 LIMIT ?2",
2283 )?;
2284 let mapped = stmt
2285 .query_map(rusqlite::params![&entity_q, limit], |r| {
2286 Ok(ExpandedEpisode {
2287 memory_id: r.get(0)?,
2288 ts_ms: r.get(1)?,
2289 content: r.get(2)?,
2290 })
2291 })?
2292 .collect::<rusqlite::Result<Vec<_>>>()?;
2293 Ok::<_, rusqlite::Error>(mapped)
2294 })
2295 .await
2296 .map_err(ApiError::from)?;
2297
2298 let mut nodes = Vec::with_capacity(rows.len());
2301 let mut edges = Vec::with_capacity(rows.len());
2302 for ep in rows {
2303 let target_id = format!("ep:{}", ep.memory_id);
2304 edges.push(GraphEdge {
2305 id: edge_id(&node_id_full, "triple", &target_id),
2306 source: node_id_full.clone(),
2307 target: target_id,
2308 kind: "triple",
2309 predicate: None,
2310 weight: None,
2311 });
2312 nodes.push(graph_node_for_episode(tenant_id, &ep));
2313 }
2314 let _ = entity_value;
2316 Ok(GraphExpandResponse { nodes, edges })
2317}
2318
2319async fn expand_semantic(
2322 tenant: &TenantHandle,
2323 tenant_id: &str,
2324 node_kind: NodeKind,
2325 value: &str,
2326 node_id_full: &str,
2327 limit: i64,
2328) -> Result<GraphExpandResponse, ApiError> {
2329 if node_kind != NodeKind::Episode {
2330 return Err(ApiError::bad_request(format!(
2331 "kind=semantic only valid for episode source nodes; got {}",
2332 node_kind.as_wire_str()
2333 )));
2334 }
2335 let memory_id = value.to_string();
2336 let memory_id_q = memory_id.clone();
2337 let content: Option<String> = tenant
2342 .read()
2343 .interact(move |conn| {
2344 conn.query_row(
2345 "SELECT content FROM episodes WHERE memory_id = ?1 AND status = 'active'",
2346 rusqlite::params![&memory_id_q],
2347 |r| r.get::<_, String>(0),
2348 )
2349 .map(Some)
2350 .or_else(|e| match e {
2351 rusqlite::Error::QueryReturnedNoRows => Ok(None),
2352 other => Err(other),
2353 })
2354 })
2355 .await
2356 .map_err(ApiError::from)?;
2357
2358 let content = content.ok_or_else(|| {
2359 ApiError::not_found(format!(
2360 "node_id {node_id_full:?} (memory_id {memory_id}) not found in current tenant"
2361 ))
2362 })?;
2363
2364 let widened = (limit as usize).saturating_add(1).min(100);
2367 let result = solo_query::recall::run_recall_inner(
2368 tenant.embedder(),
2369 tenant.hnsw(),
2370 tenant.read(),
2371 &content,
2372 widened,
2373 )
2374 .await
2375 .map_err(ApiError::from)?;
2376
2377 let mut nodes = Vec::new();
2378 let mut edges = Vec::new();
2379 for hit in result.hits.into_iter() {
2380 if hit.memory_id == memory_id {
2381 continue;
2383 }
2384 if nodes.len() as i64 >= limit {
2385 break;
2386 }
2387 let weight = (1.0 - hit.cos_distance).max(0.0);
2391 let target_id = format!("ep:{}", hit.memory_id);
2392 edges.push(GraphEdge {
2393 id: edge_id(node_id_full, "semantic", &target_id),
2394 source: node_id_full.to_string(),
2395 target: target_id,
2396 kind: "semantic",
2397 predicate: None,
2398 weight: Some(weight),
2399 });
2400 nodes.push(GraphNode {
2401 id: format!("ep:{}", hit.memory_id),
2402 kind: NodeKind::Episode.as_wire_str(),
2403 label: episode_label(&hit.content),
2404 ts_ms: None,
2405 tenant_id: tenant_id.to_string(),
2406 preview: Some(truncate_preview(&hit.content, GRAPH_PREVIEW_CHARS)),
2407 });
2408 }
2409 Ok(GraphExpandResponse { nodes, edges })
2410}
2411
2412async fn ensure_episode_exists(
2416 tenant: &TenantHandle,
2417 memory_id: &str,
2418 node_id_full: &str,
2419) -> Result<(), ApiError> {
2420 let memory_id_q = memory_id.to_string();
2421 let exists: i64 = tenant
2422 .read()
2423 .interact(move |conn| {
2424 conn.query_row(
2425 "SELECT COUNT(*) FROM episodes WHERE memory_id = ?1",
2426 rusqlite::params![&memory_id_q],
2427 |r| r.get(0),
2428 )
2429 })
2430 .await
2431 .map_err(ApiError::from)?;
2432 if exists == 0 {
2433 return Err(ApiError::not_found(format!(
2434 "node_id {node_id_full:?} not found in current tenant"
2435 )));
2436 }
2437 Ok(())
2438}
2439
2440async fn ensure_cluster_exists(
2441 tenant: &TenantHandle,
2442 cluster_id: &str,
2443 node_id_full: &str,
2444) -> Result<(), ApiError> {
2445 let cluster_id_q = cluster_id.to_string();
2446 let exists: i64 = tenant
2447 .read()
2448 .interact(move |conn| {
2449 conn.query_row(
2450 "SELECT COUNT(*) FROM clusters WHERE cluster_id = ?1",
2451 rusqlite::params![&cluster_id_q],
2452 |r| r.get(0),
2453 )
2454 })
2455 .await
2456 .map_err(ApiError::from)?;
2457 if exists == 0 {
2458 return Err(ApiError::not_found(format!(
2459 "node_id {node_id_full:?} not found in current tenant"
2460 )));
2461 }
2462 Ok(())
2463}
2464
2465async fn ensure_document_exists(
2466 tenant: &TenantHandle,
2467 doc_id: &str,
2468 node_id_full: &str,
2469) -> Result<(), ApiError> {
2470 let doc_id_q = doc_id.to_string();
2471 let exists: i64 = tenant
2472 .read()
2473 .interact(move |conn| {
2474 conn.query_row(
2475 "SELECT COUNT(*) FROM documents WHERE doc_id = ?1",
2476 rusqlite::params![&doc_id_q],
2477 |r| r.get(0),
2478 )
2479 })
2480 .await
2481 .map_err(ApiError::from)?;
2482 if exists == 0 {
2483 return Err(ApiError::not_found(format!(
2484 "node_id {node_id_full:?} not found in current tenant"
2485 )));
2486 }
2487 Ok(())
2488}
2489
2490const GRAPH_NODES_DEFAULT_LIMIT: u32 = 100;
2504const GRAPH_NODES_MAX_LIMIT: u32 = 1000;
2505const GRAPH_EDGES_DEFAULT_LIMIT: u32 = 200;
2506const GRAPH_EDGES_MAX_LIMIT: u32 = 2000;
2507const GRAPH_ENTITY_CAP: usize = 200;
2508
2509const ENTITY_CAP_HEADER: &str = "x-solo-entity-cap-reached";
2513
2514#[derive(Debug, Deserialize)]
2515struct GraphNodesQuery {
2516 #[serde(default)]
2521 kind: Option<String>,
2522 #[serde(default)]
2523 since_ms: Option<i64>,
2524 #[serde(default)]
2525 until_ms: Option<i64>,
2526 #[serde(default)]
2527 limit: Option<u32>,
2528 #[serde(default)]
2529 cursor: Option<String>,
2530}
2531
2532#[derive(Debug, Deserialize)]
2533struct GraphEdgesQuery {
2534 #[serde(default)]
2535 node_id: Option<String>,
2536 #[serde(default)]
2539 r#type: Option<String>,
2540 #[serde(default)]
2541 limit: Option<u32>,
2542 #[serde(default)]
2543 cursor: Option<String>,
2544}
2545
2546#[derive(Debug, Serialize)]
2547struct GraphNodesResponse {
2548 nodes: Vec<GraphNode>,
2549 #[serde(skip_serializing_if = "Option::is_none")]
2550 next_cursor: Option<String>,
2551}
2552
2553#[derive(Debug, Serialize)]
2554struct GraphEdgesResponse {
2555 edges: Vec<GraphEdge>,
2556 #[serde(skip_serializing_if = "Option::is_none")]
2557 next_cursor: Option<String>,
2558}
2559
2560fn parse_node_kind_filter(raw: Option<&str>) -> Result<Vec<NodeKind>, ApiError> {
2564 let raw = raw.unwrap_or("").trim();
2565 if raw.is_empty() {
2566 return Ok(vec![
2567 NodeKind::Episode,
2568 NodeKind::Document,
2569 NodeKind::Chunk,
2570 NodeKind::Cluster,
2571 NodeKind::Entity,
2572 ]);
2573 }
2574 let mut out = Vec::new();
2575 for token in raw.split(',') {
2576 let token = token.trim();
2577 if token.is_empty() {
2578 continue;
2579 }
2580 let kind = match token {
2581 "episode" => NodeKind::Episode,
2582 "document" => NodeKind::Document,
2583 "chunk" => NodeKind::Chunk,
2584 "cluster" => NodeKind::Cluster,
2585 "entity" => NodeKind::Entity,
2586 other => {
2587 return Err(ApiError::bad_request(format!(
2588 "unknown node kind {other:?}; expected one of episode/document/chunk/cluster/entity"
2589 )));
2590 }
2591 };
2592 if !out.contains(&kind) {
2593 out.push(kind);
2594 }
2595 }
2596 if out.is_empty() {
2597 return Err(ApiError::bad_request(
2598 "kind filter is empty after parsing; either omit or list at least one kind",
2599 ));
2600 }
2601 Ok(out)
2602}
2603
2604#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
2606enum EdgeKind {
2607 Triple,
2608 DocumentChunk,
2609 ClusterMember,
2610}
2611
2612impl EdgeKind {
2613 fn order_idx(self) -> u8 {
2615 match self {
2616 Self::Triple => 0,
2617 Self::DocumentChunk => 1,
2618 Self::ClusterMember => 2,
2619 }
2620 }
2621}
2622
2623fn parse_edge_kind_filter(raw: Option<&str>) -> Result<Vec<EdgeKind>, ApiError> {
2624 let raw = raw.unwrap_or("").trim();
2625 if raw.is_empty() {
2626 return Ok(vec![
2629 EdgeKind::Triple,
2630 EdgeKind::DocumentChunk,
2631 EdgeKind::ClusterMember,
2632 ]);
2633 }
2634 let mut out = Vec::new();
2635 for token in raw.split(',') {
2636 let token = token.trim();
2637 if token.is_empty() {
2638 continue;
2639 }
2640 let kind = match token {
2641 "triple" => EdgeKind::Triple,
2642 "document_chunk" => EdgeKind::DocumentChunk,
2643 "cluster_member" => EdgeKind::ClusterMember,
2644 "semantic" => {
2645 return Err(ApiError::bad_request(
2648 "semantic edges are available via /v1/graph/neighbors/:id?kind=semantic, not /v1/graph/edges (semantic edges aren't precomputed; they're query-time HNSW lookups)",
2649 ));
2650 }
2651 other => {
2652 return Err(ApiError::bad_request(format!(
2653 "unknown edge type {other:?}; expected one of triple/document_chunk/cluster_member"
2654 )));
2655 }
2656 };
2657 if !out.contains(&kind) {
2658 out.push(kind);
2659 }
2660 }
2661 if out.is_empty() {
2662 return Err(ApiError::bad_request(
2663 "type filter is empty after parsing; either omit or list at least one type",
2664 ));
2665 }
2666 Ok(out)
2667}
2668
2669#[derive(Debug, Serialize, Deserialize)]
2673struct NodesCursor {
2674 ts_ms: i64,
2675 id: String,
2676}
2677
2678#[derive(Debug, Serialize, Deserialize)]
2684struct EdgesCursor {
2685 kind_idx: u8,
2686 sub_id: String,
2687}
2688
2689fn encode_cursor<T: Serialize>(value: &T) -> Result<String, ApiError> {
2690 use base64::Engine;
2691 let json = serde_json::to_vec(value).map_err(|e| {
2692 ApiError::internal(format!("cursor serialize: {e}"))
2693 })?;
2694 Ok(base64::engine::general_purpose::URL_SAFE_NO_PAD.encode(json))
2695}
2696
2697fn decode_cursor<T: for<'de> Deserialize<'de>>(raw: &str) -> Result<T, ApiError> {
2698 use base64::Engine;
2699 let bytes = base64::engine::general_purpose::URL_SAFE_NO_PAD
2700 .decode(raw.as_bytes())
2701 .map_err(|e| ApiError::bad_request(format!("cursor: bad base64: {e}")))?;
2702 serde_json::from_slice::<T>(&bytes)
2703 .map_err(|e| ApiError::bad_request(format!("cursor: bad JSON payload: {e}")))
2704}
2705
2706#[derive(Debug)]
2710struct StagingNode {
2711 node: GraphNode,
2712 sort_ts_ms: i64,
2713 sort_id: String,
2714}
2715
2716fn cmp_node_sort_keys(a: (i64, &str), b: (i64, &str)) -> std::cmp::Ordering {
2719 match b.0.cmp(&a.0) {
2721 std::cmp::Ordering::Equal => a.1.cmp(b.1), other => other,
2723 }
2724}
2725
2726fn node_passes_cursor(ts_ms: i64, id: &str, cursor: &NodesCursor) -> bool {
2730 cmp_node_sort_keys((ts_ms, id), (cursor.ts_ms, cursor.id.as_str()))
2731 == std::cmp::Ordering::Greater
2732}
2733
2734#[derive(Debug)]
2738struct NodeRowEp {
2739 memory_id: String,
2740 ts_ms: i64,
2741 content: String,
2742}
2743
2744fn fetch_episodes_for_nodes(
2745 conn: &rusqlite::Connection,
2746 since_ms: Option<i64>,
2747 until_ms: Option<i64>,
2748 cursor: Option<&NodesCursor>,
2749 limit: i64,
2750) -> rusqlite::Result<Vec<NodeRowEp>> {
2751 let mut sql = String::from(
2752 "SELECT memory_id, ts_ms, content
2753 FROM episodes
2754 WHERE status = 'active'",
2755 );
2756 let mut params: Vec<rusqlite::types::Value> = Vec::new();
2757 if let Some(s) = since_ms {
2758 sql.push_str(" AND ts_ms >= ?");
2759 params.push(s.into());
2760 }
2761 if let Some(u) = until_ms {
2762 sql.push_str(" AND ts_ms <= ?");
2763 params.push(u.into());
2764 }
2765 if let Some(cur) = cursor {
2772 sql.push_str(" AND ts_ms <= ?");
2773 params.push(cur.ts_ms.into());
2774 }
2775 sql.push_str(" ORDER BY ts_ms DESC, memory_id ASC LIMIT ?");
2776 params.push(limit.into());
2777 let mut stmt = conn.prepare(&sql)?;
2778 let rows: Vec<NodeRowEp> = stmt
2779 .query_map(rusqlite::params_from_iter(params), |r| {
2780 Ok(NodeRowEp {
2781 memory_id: r.get(0)?,
2782 ts_ms: r.get(1)?,
2783 content: r.get(2)?,
2784 })
2785 })?
2786 .collect::<rusqlite::Result<Vec<_>>>()?;
2787 Ok(rows)
2788}
2789
2790#[derive(Debug)]
2791struct NodeRowDoc {
2792 doc_id: String,
2793 title: Option<String>,
2794 source: Option<String>,
2795 ingested_at_ms: i64,
2796}
2797
2798fn fetch_documents_for_nodes(
2799 conn: &rusqlite::Connection,
2800 since_ms: Option<i64>,
2801 until_ms: Option<i64>,
2802 cursor: Option<&NodesCursor>,
2803 limit: i64,
2804) -> rusqlite::Result<Vec<NodeRowDoc>> {
2805 let mut sql = String::from(
2806 "SELECT doc_id, title, source, ingested_at_ms
2807 FROM documents
2808 WHERE status = 'active'",
2809 );
2810 let mut params: Vec<rusqlite::types::Value> = Vec::new();
2811 if let Some(s) = since_ms {
2812 sql.push_str(" AND ingested_at_ms >= ?");
2813 params.push(s.into());
2814 }
2815 if let Some(u) = until_ms {
2816 sql.push_str(" AND ingested_at_ms <= ?");
2817 params.push(u.into());
2818 }
2819 if let Some(cur) = cursor {
2820 sql.push_str(" AND ingested_at_ms <= ?");
2821 params.push(cur.ts_ms.into());
2822 }
2823 sql.push_str(" ORDER BY ingested_at_ms DESC, doc_id ASC LIMIT ?");
2824 params.push(limit.into());
2825 let mut stmt = conn.prepare(&sql)?;
2826 let rows: Vec<NodeRowDoc> = stmt
2827 .query_map(rusqlite::params_from_iter(params), |r| {
2828 Ok(NodeRowDoc {
2829 doc_id: r.get(0)?,
2830 title: r.get(1)?,
2831 source: r.get(2)?,
2832 ingested_at_ms: r.get(3)?,
2833 })
2834 })?
2835 .collect::<rusqlite::Result<Vec<_>>>()?;
2836 Ok(rows)
2837}
2838
2839#[derive(Debug)]
2840struct NodeRowChunk {
2841 chunk_id: String,
2842 chunk_index: i64,
2843 content: String,
2844 created_at_ms: i64,
2845}
2846
2847fn fetch_chunks_for_nodes(
2848 conn: &rusqlite::Connection,
2849 since_ms: Option<i64>,
2850 until_ms: Option<i64>,
2851 cursor: Option<&NodesCursor>,
2852 limit: i64,
2853) -> rusqlite::Result<Vec<NodeRowChunk>> {
2854 let mut sql = String::from(
2857 "SELECT c.chunk_id, c.chunk_index, c.content, c.created_at_ms
2858 FROM document_chunks c
2859 JOIN documents d ON d.doc_id = c.doc_id
2860 WHERE d.status = 'active'",
2861 );
2862 let mut params: Vec<rusqlite::types::Value> = Vec::new();
2863 if let Some(s) = since_ms {
2864 sql.push_str(" AND c.created_at_ms >= ?");
2865 params.push(s.into());
2866 }
2867 if let Some(u) = until_ms {
2868 sql.push_str(" AND c.created_at_ms <= ?");
2869 params.push(u.into());
2870 }
2871 if let Some(cur) = cursor {
2872 sql.push_str(" AND c.created_at_ms <= ?");
2873 params.push(cur.ts_ms.into());
2874 }
2875 sql.push_str(" ORDER BY c.created_at_ms DESC, c.chunk_id ASC LIMIT ?");
2876 params.push(limit.into());
2877 let mut stmt = conn.prepare(&sql)?;
2878 let rows: Vec<NodeRowChunk> = stmt
2879 .query_map(rusqlite::params_from_iter(params), |r| {
2880 Ok(NodeRowChunk {
2881 chunk_id: r.get(0)?,
2882 chunk_index: r.get(1)?,
2883 content: r.get(2)?,
2884 created_at_ms: r.get(3)?,
2885 })
2886 })?
2887 .collect::<rusqlite::Result<Vec<_>>>()?;
2888 Ok(rows)
2889}
2890
2891#[derive(Debug)]
2892struct NodeRowCluster {
2893 cluster_id: String,
2894 abstraction: Option<String>,
2895 created_at_ms: i64,
2896}
2897
2898fn fetch_clusters_for_nodes(
2899 conn: &rusqlite::Connection,
2900 since_ms: Option<i64>,
2901 until_ms: Option<i64>,
2902 cursor: Option<&NodesCursor>,
2903 limit: i64,
2904) -> rusqlite::Result<Vec<NodeRowCluster>> {
2905 let mut sql = String::from(
2908 "SELECT c.cluster_id, sa.content, c.created_at_ms
2909 FROM clusters c
2910 LEFT JOIN semantic_abstractions sa ON sa.cluster_id = c.cluster_id
2911 WHERE 1=1",
2912 );
2913 let mut params: Vec<rusqlite::types::Value> = Vec::new();
2914 if let Some(s) = since_ms {
2915 sql.push_str(" AND c.created_at_ms >= ?");
2916 params.push(s.into());
2917 }
2918 if let Some(u) = until_ms {
2919 sql.push_str(" AND c.created_at_ms <= ?");
2920 params.push(u.into());
2921 }
2922 if let Some(cur) = cursor {
2923 sql.push_str(" AND c.created_at_ms <= ?");
2924 params.push(cur.ts_ms.into());
2925 }
2926 sql.push_str(" ORDER BY c.created_at_ms DESC, c.cluster_id ASC LIMIT ?");
2927 params.push(limit.into());
2928 let mut stmt = conn.prepare(&sql)?;
2929 let rows: Vec<NodeRowCluster> = stmt
2930 .query_map(rusqlite::params_from_iter(params), |r| {
2931 Ok(NodeRowCluster {
2932 cluster_id: r.get(0)?,
2933 abstraction: r.get(1)?,
2934 created_at_ms: r.get(2)?,
2935 })
2936 })?
2937 .collect::<rusqlite::Result<Vec<_>>>()?;
2938 Ok(rows)
2939}
2940
2941#[derive(Debug)]
2942struct NodeRowEntity {
2943 value: String,
2944 ref_count: i64,
2945 first_seen_ms: i64,
2946}
2947
2948fn fetch_entities_for_nodes(
2957 conn: &rusqlite::Connection,
2958 since_ms: Option<i64>,
2959 until_ms: Option<i64>,
2960 cursor: Option<&NodesCursor>,
2961) -> rusqlite::Result<(Vec<NodeRowEntity>, bool)> {
2962 let mut sql = String::from(
2967 "WITH all_refs AS (
2968 SELECT subject_id AS value, valid_from_ms AS ts_ms FROM triples WHERE status = 'active'
2969 UNION ALL
2970 SELECT object_id AS value, valid_from_ms AS ts_ms FROM triples WHERE status = 'active'
2971 )
2972 SELECT value, COUNT(*) AS ref_count, MIN(ts_ms) AS first_seen_ms
2973 FROM all_refs
2974 WHERE 1=1",
2975 );
2976 let mut params: Vec<rusqlite::types::Value> = Vec::new();
2977 if let Some(s) = since_ms {
2978 sql.push_str(" AND ts_ms >= ?");
2979 params.push(s.into());
2980 }
2981 if let Some(u) = until_ms {
2982 sql.push_str(" AND ts_ms <= ?");
2983 params.push(u.into());
2984 }
2985 sql.push_str(" GROUP BY value");
2989 if let Some(ts) = cursor.map(|c| c.ts_ms) {
2990 sql.push_str(" HAVING MIN(ts_ms) <= ?");
2991 params.push(ts.into());
2992 }
2993 let want = GRAPH_ENTITY_CAP as i64 + 1;
2995 sql.push_str(" ORDER BY ref_count DESC, value ASC LIMIT ?");
2996 params.push(want.into());
2997 let mut stmt = conn.prepare(&sql)?;
2998 let rows: Vec<NodeRowEntity> = stmt
2999 .query_map(rusqlite::params_from_iter(params), |r| {
3000 Ok(NodeRowEntity {
3001 value: r.get(0)?,
3002 ref_count: r.get(1)?,
3003 first_seen_ms: r.get(2)?,
3004 })
3005 })?
3006 .collect::<rusqlite::Result<Vec<_>>>()?;
3007 let cap_reached = rows.len() > GRAPH_ENTITY_CAP;
3008 let mut trimmed = rows;
3009 if cap_reached {
3010 trimmed.truncate(GRAPH_ENTITY_CAP);
3011 }
3012 Ok((trimmed, cap_reached))
3013}
3014
3015async fn graph_nodes_handler(
3018 TenantExtractor(tenant): TenantExtractor,
3019 Query(q): Query<GraphNodesQuery>,
3020) -> Result<Response, ApiError> {
3021 let limit = q.limit.unwrap_or(GRAPH_NODES_DEFAULT_LIMIT);
3022 let limit = limit.clamp(1, GRAPH_NODES_MAX_LIMIT);
3023 let kinds = parse_node_kind_filter(q.kind.as_deref())?;
3024 let since_ms = q.since_ms;
3025 let until_ms = q.until_ms;
3026 if let (Some(s), Some(u)) = (since_ms, until_ms) {
3027 if s > u {
3028 return Err(ApiError::bad_request(format!(
3029 "since_ms ({s}) must be <= until_ms ({u})"
3030 )));
3031 }
3032 }
3033 let cursor = match q.cursor.as_deref() {
3034 None => None,
3035 Some("") => None,
3036 Some(raw) => Some(decode_cursor::<NodesCursor>(raw)?),
3037 };
3038 let want_episode = kinds.contains(&NodeKind::Episode);
3039 let want_document = kinds.contains(&NodeKind::Document);
3040 let want_chunk = kinds.contains(&NodeKind::Chunk);
3041 let want_cluster = kinds.contains(&NodeKind::Cluster);
3042 let want_entity = kinds.contains(&NodeKind::Entity);
3043
3044 let per_kind_limit = (limit as i64).saturating_add(2);
3053 let tenant_id_for_blocking = tenant.tenant_id().to_string();
3054 let cursor_clone = cursor.as_ref().map(|c| NodesCursor {
3055 ts_ms: c.ts_ms,
3056 id: c.id.clone(),
3057 });
3058
3059 let (mut staged, cap_reached) = tenant
3060 .read()
3061 .interact(move |conn| {
3062 let mut staged: Vec<StagingNode> = Vec::new();
3063 let mut cap_reached = false;
3064 let cursor_ref = cursor_clone.as_ref();
3065
3066 if want_episode {
3067 let eps = fetch_episodes_for_nodes(conn, since_ms, until_ms, cursor_ref, per_kind_limit)?;
3068 for ep in eps {
3069 let id = format!("ep:{}", ep.memory_id);
3070 let exp = ExpandedEpisode {
3071 memory_id: ep.memory_id,
3072 ts_ms: ep.ts_ms,
3073 content: ep.content,
3074 };
3075 let node = graph_node_for_episode(&tenant_id_for_blocking, &exp);
3076 staged.push(StagingNode {
3077 sort_ts_ms: ep.ts_ms,
3078 sort_id: id.clone(),
3079 node,
3080 });
3081 }
3082 }
3083 if want_document {
3084 let docs = fetch_documents_for_nodes(conn, since_ms, until_ms, cursor_ref, per_kind_limit)?;
3085 for d in docs {
3086 let id = format!("doc:{}", d.doc_id);
3087 let exp = ExpandedDocument {
3088 doc_id: d.doc_id,
3089 title: d.title,
3090 source: d.source,
3091 ingested_at_ms: d.ingested_at_ms,
3092 };
3093 let node = graph_node_for_document(&tenant_id_for_blocking, &exp);
3094 staged.push(StagingNode {
3095 sort_ts_ms: d.ingested_at_ms,
3096 sort_id: id.clone(),
3097 node,
3098 });
3099 }
3100 }
3101 if want_chunk {
3102 let chunks = fetch_chunks_for_nodes(conn, since_ms, until_ms, cursor_ref, per_kind_limit)?;
3103 for c in chunks {
3104 let id = format!("chunk:{}", c.chunk_id);
3105 let exp = ExpandedChunk {
3106 chunk_id: c.chunk_id,
3107 chunk_index: c.chunk_index,
3108 content: c.content,
3109 };
3110 let mut node = graph_node_for_chunk(&tenant_id_for_blocking, &exp);
3115 node.ts_ms = Some(c.created_at_ms);
3116 staged.push(StagingNode {
3117 sort_ts_ms: c.created_at_ms,
3118 sort_id: id.clone(),
3119 node,
3120 });
3121 }
3122 }
3123 if want_cluster {
3124 let cls = fetch_clusters_for_nodes(conn, since_ms, until_ms, cursor_ref, per_kind_limit)?;
3125 for c in cls {
3126 let id = format!("cl:{}", c.cluster_id);
3127 let node = graph_node_for_cluster(
3128 &tenant_id_for_blocking,
3129 &c.cluster_id,
3130 c.abstraction.as_deref(),
3131 c.created_at_ms,
3132 );
3133 staged.push(StagingNode {
3134 sort_ts_ms: c.created_at_ms,
3135 sort_id: id.clone(),
3136 node,
3137 });
3138 }
3139 }
3140 if want_entity {
3141 let (ents, was_cap_reached) =
3142 fetch_entities_for_nodes(conn, since_ms, until_ms, cursor_ref)?;
3143 cap_reached = was_cap_reached;
3144 for e in ents {
3145 let id = format!("ent:{}", e.value);
3146 let mut node = graph_node_for_entity(&tenant_id_for_blocking, &e.value);
3147 node.ts_ms = Some(e.first_seen_ms);
3148 node.preview =
3149 Some(format!("Referenced in {} triples", e.ref_count));
3150 staged.push(StagingNode {
3151 sort_ts_ms: e.first_seen_ms,
3152 sort_id: id.clone(),
3153 node,
3154 });
3155 }
3156 }
3157 Ok::<_, rusqlite::Error>((staged, cap_reached))
3158 })
3159 .await
3160 .map_err(ApiError::from)?;
3161
3162 if let Some(cur) = &cursor {
3164 staged.retain(|s| node_passes_cursor(s.sort_ts_ms, &s.sort_id, cur));
3165 }
3166
3167 staged.sort_by(|a, b| {
3169 cmp_node_sort_keys((a.sort_ts_ms, &a.sort_id), (b.sort_ts_ms, &b.sort_id))
3170 });
3171
3172 let limit_us = limit as usize;
3174 let next_cursor = if staged.len() > limit_us {
3175 let last = &staged[limit_us - 1];
3176 Some(NodesCursor {
3177 ts_ms: last.sort_ts_ms,
3178 id: last.sort_id.clone(),
3179 })
3180 } else {
3181 None
3182 };
3183 staged.truncate(limit_us);
3184
3185 let next_cursor_str = match next_cursor {
3186 Some(c) => Some(encode_cursor(&c)?),
3187 None => None,
3188 };
3189
3190 let nodes: Vec<GraphNode> = staged.into_iter().map(|s| s.node).collect();
3191 let payload = GraphNodesResponse {
3192 nodes,
3193 next_cursor: next_cursor_str,
3194 };
3195
3196 let mut response = Json(payload).into_response();
3199 if cap_reached {
3200 response
3201 .headers_mut()
3202 .insert(ENTITY_CAP_HEADER, HeaderValue::from_static("true"));
3203 }
3204 Ok(response)
3205}
3206
3207#[derive(Debug)]
3210struct StagingEdge {
3211 edge: GraphEdge,
3212 kind_idx: u8,
3213 sub_id: String,
3214}
3215
3216fn cmp_edge_sort_keys(a: (u8, &str), b: (u8, &str)) -> std::cmp::Ordering {
3217 match a.0.cmp(&b.0) {
3218 std::cmp::Ordering::Equal => a.1.cmp(b.1),
3219 other => other,
3220 }
3221}
3222
3223fn edge_passes_cursor(kind_idx: u8, sub_id: &str, cursor: &EdgesCursor) -> bool {
3224 cmp_edge_sort_keys((kind_idx, sub_id), (cursor.kind_idx, cursor.sub_id.as_str()))
3225 == std::cmp::Ordering::Greater
3226}
3227
3228fn edge_touches_focus(
3232 kind: EdgeKind,
3233 focus_kind: NodeKind,
3234 focus_value: &str,
3235 src_value: &str,
3236 tgt_value: &str,
3237 extra_value: Option<&str>,
3238) -> bool {
3239 match kind {
3242 EdgeKind::Triple => match focus_kind {
3243 NodeKind::Episode => src_value == focus_value,
3248 NodeKind::Entity => {
3249 tgt_value == focus_value
3250 || extra_value.map(|x| x == focus_value).unwrap_or(false)
3251 || src_value == focus_value
3252 }
3253 _ => false,
3254 },
3255 EdgeKind::DocumentChunk => match focus_kind {
3256 NodeKind::Document => src_value == focus_value,
3257 NodeKind::Chunk => tgt_value == focus_value,
3258 _ => false,
3259 },
3260 EdgeKind::ClusterMember => match focus_kind {
3261 NodeKind::Cluster => src_value == focus_value,
3262 NodeKind::Episode => tgt_value == focus_value,
3263 _ => false,
3264 },
3265 }
3266}
3267
3268#[derive(Debug)]
3269struct EdgeRowTriple {
3270 triple_id: String,
3271 source_memory_id: Option<String>,
3272 object_id: String,
3273 predicate: String,
3274 confidence: f32,
3275}
3276
3277fn fetch_triple_edges(conn: &rusqlite::Connection) -> rusqlite::Result<Vec<EdgeRowTriple>> {
3278 let safety_cap = (GRAPH_EDGES_MAX_LIMIT as i64) * 4;
3284 let mut stmt = conn.prepare(
3285 "SELECT t.triple_id, e.memory_id, t.object_id, t.predicate, t.confidence
3286 FROM triples t
3287 LEFT JOIN episodes e ON e.rowid = t.source_episode_id
3288 WHERE t.status = 'active'
3289 ORDER BY t.triple_id ASC
3290 LIMIT ?1",
3291 )?;
3292 let rows: Vec<EdgeRowTriple> = stmt
3293 .query_map(rusqlite::params![safety_cap], |r| {
3294 Ok(EdgeRowTriple {
3295 triple_id: r.get(0)?,
3296 source_memory_id: r.get::<_, Option<String>>(1)?,
3297 object_id: r.get(2)?,
3298 predicate: r.get(3)?,
3299 confidence: r.get(4)?,
3300 })
3301 })?
3302 .collect::<rusqlite::Result<Vec<_>>>()?;
3303 Ok(rows)
3304}
3305
3306#[derive(Debug)]
3307struct EdgeRowDocChunk {
3308 chunk_id: String,
3309 doc_id: String,
3310}
3311
3312fn fetch_document_chunk_edges(
3313 conn: &rusqlite::Connection,
3314) -> rusqlite::Result<Vec<EdgeRowDocChunk>> {
3315 let safety_cap = (GRAPH_EDGES_MAX_LIMIT as i64) * 4;
3316 let mut stmt = conn.prepare(
3317 "SELECT c.chunk_id, c.doc_id
3318 FROM document_chunks c
3319 JOIN documents d ON d.doc_id = c.doc_id
3320 WHERE d.status = 'active'
3321 ORDER BY c.chunk_id ASC
3322 LIMIT ?1",
3323 )?;
3324 let rows: Vec<EdgeRowDocChunk> = stmt
3325 .query_map(rusqlite::params![safety_cap], |r| {
3326 Ok(EdgeRowDocChunk {
3327 chunk_id: r.get(0)?,
3328 doc_id: r.get(1)?,
3329 })
3330 })?
3331 .collect::<rusqlite::Result<Vec<_>>>()?;
3332 Ok(rows)
3333}
3334
3335#[derive(Debug)]
3336struct EdgeRowClusterMember {
3337 cluster_id: String,
3338 memory_id: String,
3339}
3340
3341fn fetch_cluster_member_edges(
3342 conn: &rusqlite::Connection,
3343) -> rusqlite::Result<Vec<EdgeRowClusterMember>> {
3344 let safety_cap = (GRAPH_EDGES_MAX_LIMIT as i64) * 4;
3345 let mut stmt = conn.prepare(
3346 "SELECT ce.cluster_id, ce.memory_id
3347 FROM cluster_episodes ce
3348 JOIN episodes e ON e.memory_id = ce.memory_id
3349 WHERE e.status = 'active'
3350 ORDER BY ce.cluster_id ASC, ce.memory_id ASC
3351 LIMIT ?1",
3352 )?;
3353 let rows: Vec<EdgeRowClusterMember> = stmt
3354 .query_map(rusqlite::params![safety_cap], |r| {
3355 Ok(EdgeRowClusterMember {
3356 cluster_id: r.get(0)?,
3357 memory_id: r.get(1)?,
3358 })
3359 })?
3360 .collect::<rusqlite::Result<Vec<_>>>()?;
3361 Ok(rows)
3362}
3363
3364async fn graph_edges_handler(
3367 TenantExtractor(tenant): TenantExtractor,
3368 Query(q): Query<GraphEdgesQuery>,
3369) -> Result<Json<GraphEdgesResponse>, ApiError> {
3370 let limit = q.limit.unwrap_or(GRAPH_EDGES_DEFAULT_LIMIT);
3371 let limit = limit.clamp(1, GRAPH_EDGES_MAX_LIMIT);
3372 let kinds = parse_edge_kind_filter(q.r#type.as_deref())?;
3373 let cursor = match q.cursor.as_deref() {
3374 None => None,
3375 Some("") => None,
3376 Some(raw) => Some(decode_cursor::<EdgesCursor>(raw)?),
3377 };
3378
3379 let focus = match q.node_id.as_deref() {
3380 None => None,
3381 Some(raw) => {
3382 let (kind, value) = parse_node_id(raw)?;
3383 Some((kind, value.to_string()))
3384 }
3385 };
3386
3387 let want_triple = kinds.contains(&EdgeKind::Triple);
3388 let want_doc_chunk = kinds.contains(&EdgeKind::DocumentChunk);
3389 let want_cluster_member = kinds.contains(&EdgeKind::ClusterMember);
3390
3391 let staged: Vec<StagingEdge> = tenant
3392 .read()
3393 .interact(move |conn| {
3394 let mut staged: Vec<StagingEdge> = Vec::new();
3395
3396 if want_triple {
3397 for t in fetch_triple_edges(conn)? {
3398 let src_id = match &t.source_memory_id {
3399 Some(mid) => format!("ep:{mid}"),
3400 None => continue, };
3402 let tgt_id = format!("ent:{}", t.object_id);
3403 if let Some((fk, fv)) = &focus {
3404 if !edge_touches_focus(
3408 EdgeKind::Triple,
3409 *fk,
3410 fv,
3411 t.source_memory_id
3412 .as_deref()
3413 .unwrap_or(""),
3414 &t.object_id,
3415 None,
3421 ) {
3422 continue;
3423 }
3424 }
3425 let edge = GraphEdge {
3426 id: edge_id(&src_id, "triple", &tgt_id),
3427 source: src_id,
3428 target: tgt_id,
3429 kind: "triple",
3430 predicate: Some(t.predicate),
3431 weight: Some(t.confidence),
3432 };
3433 staged.push(StagingEdge {
3434 edge,
3435 kind_idx: EdgeKind::Triple.order_idx(),
3436 sub_id: t.triple_id,
3437 });
3438 }
3439 }
3440 if want_doc_chunk {
3441 for dc in fetch_document_chunk_edges(conn)? {
3442 let src_id = format!("doc:{}", dc.doc_id);
3443 let tgt_id = format!("chunk:{}", dc.chunk_id);
3444 if let Some((fk, fv)) = &focus {
3445 if !edge_touches_focus(
3446 EdgeKind::DocumentChunk,
3447 *fk,
3448 fv,
3449 &dc.doc_id,
3450 &dc.chunk_id,
3451 None,
3452 ) {
3453 continue;
3454 }
3455 }
3456 let edge = GraphEdge {
3457 id: edge_id(&src_id, "document_chunk", &tgt_id),
3458 source: src_id,
3459 target: tgt_id,
3460 kind: "document_chunk",
3461 predicate: None,
3462 weight: None,
3463 };
3464 staged.push(StagingEdge {
3465 edge,
3466 kind_idx: EdgeKind::DocumentChunk.order_idx(),
3467 sub_id: dc.chunk_id,
3468 });
3469 }
3470 }
3471 if want_cluster_member {
3472 for cm in fetch_cluster_member_edges(conn)? {
3473 let src_id = format!("cl:{}", cm.cluster_id);
3474 let tgt_id = format!("ep:{}", cm.memory_id);
3475 if let Some((fk, fv)) = &focus {
3476 if !edge_touches_focus(
3477 EdgeKind::ClusterMember,
3478 *fk,
3479 fv,
3480 &cm.cluster_id,
3481 &cm.memory_id,
3482 None,
3483 ) {
3484 continue;
3485 }
3486 }
3487 let edge = GraphEdge {
3488 id: edge_id(&src_id, "cluster_member", &tgt_id),
3489 source: src_id,
3490 target: tgt_id,
3491 kind: "cluster_member",
3492 predicate: None,
3493 weight: None,
3494 };
3495 let sub_id = format!("{}\u{1f}{}", cm.cluster_id, cm.memory_id);
3496 staged.push(StagingEdge {
3497 edge,
3498 kind_idx: EdgeKind::ClusterMember.order_idx(),
3499 sub_id,
3500 });
3501 }
3502 }
3503 Ok::<_, rusqlite::Error>(staged)
3504 })
3505 .await
3506 .map_err(ApiError::from)?;
3507
3508 let mut staged = staged;
3510 if let Some(cur) = &cursor {
3511 staged.retain(|s| edge_passes_cursor(s.kind_idx, &s.sub_id, cur));
3512 }
3513
3514 staged.sort_by(|a, b| {
3516 cmp_edge_sort_keys((a.kind_idx, &a.sub_id), (b.kind_idx, &b.sub_id))
3517 });
3518
3519 let limit_us = limit as usize;
3520 let next_cursor = if staged.len() > limit_us {
3521 let last = &staged[limit_us - 1];
3522 Some(EdgesCursor {
3523 kind_idx: last.kind_idx,
3524 sub_id: last.sub_id.clone(),
3525 })
3526 } else {
3527 None
3528 };
3529 staged.truncate(limit_us);
3530 let next_cursor_str = match next_cursor {
3531 Some(c) => Some(encode_cursor(&c)?),
3532 None => None,
3533 };
3534
3535 let edges: Vec<GraphEdge> = staged.into_iter().map(|s| s.edge).collect();
3536 Ok(Json(GraphEdgesResponse {
3537 edges,
3538 next_cursor: next_cursor_str,
3539 }))
3540}
3541
3542const GRAPH_INSPECT_ENTITY_TRIPLES_CAP: i64 = 50;
3594
3595#[derive(Debug, Serialize)]
3596struct GraphInspectResponse {
3597 node: GraphNode,
3598 #[serde(skip_serializing_if = "Option::is_none")]
3599 full_text: Option<String>,
3600 triples_in: Vec<GraphEdge>,
3601 triples_out: Vec<GraphEdge>,
3602}
3603
3604async fn graph_inspect_handler(
3606 TenantExtractor(tenant): TenantExtractor,
3607 Path(id): Path<String>,
3608) -> Result<Json<GraphInspectResponse>, ApiError> {
3609 let (kind, value) = parse_node_id(&id)?;
3610 let tenant_id_str = tenant.tenant_id().to_string();
3611 let value = value.to_string();
3612 let node_id_full = id;
3613 match kind {
3614 NodeKind::Episode => {
3615 inspect_episode_node(&tenant, &tenant_id_str, value, node_id_full).await
3616 }
3617 NodeKind::Document => {
3618 inspect_document_node(&tenant, &tenant_id_str, value, node_id_full).await
3619 }
3620 NodeKind::Chunk => {
3621 inspect_chunk_node(&tenant, &tenant_id_str, value, node_id_full).await
3622 }
3623 NodeKind::Cluster => {
3624 inspect_cluster_node(&tenant, &tenant_id_str, value, node_id_full).await
3625 }
3626 NodeKind::Entity => {
3627 inspect_entity_node(&tenant, &tenant_id_str, value, node_id_full).await
3628 }
3629 }
3630 .map(Json)
3631}
3632
3633async fn inspect_episode_node(
3636 tenant: &TenantHandle,
3637 tenant_id: &str,
3638 memory_id: String,
3639 node_id_full: String,
3640) -> Result<GraphInspectResponse, ApiError> {
3641 let memory_id_for_err = memory_id.clone();
3642 let memory_id_q = memory_id.clone();
3643 let fetched: Option<(ExpandedEpisode, Vec<TripleRow>)> = tenant
3646 .read()
3647 .interact(move |conn| {
3648 let ep_row: Option<(i64, i64, String)> = conn
3649 .query_row(
3650 "SELECT rowid, ts_ms, content
3651 FROM episodes
3652 WHERE memory_id = ?1
3653 AND status = 'active'",
3654 rusqlite::params![&memory_id_q],
3655 |r| {
3656 Ok((
3657 r.get::<_, i64>(0)?,
3658 r.get::<_, i64>(1)?,
3659 r.get::<_, String>(2)?,
3660 ))
3661 },
3662 )
3663 .map(Some)
3664 .or_else(|e| match e {
3665 rusqlite::Error::QueryReturnedNoRows => Ok(None),
3666 other => Err(other),
3667 })?;
3668 let Some((rowid, ts_ms, content)) = ep_row else {
3669 return Ok(None);
3670 };
3671 let mut stmt = conn.prepare(
3672 "SELECT subject_id, predicate, object_id, confidence
3673 FROM triples
3674 WHERE source_episode_id = ?1
3675 AND status = 'active'
3676 ORDER BY valid_from_ms DESC",
3677 )?;
3678 let triples = stmt
3679 .query_map(rusqlite::params![rowid], |r| {
3680 Ok(TripleRow {
3681 subject_id: r.get(0)?,
3682 predicate: r.get(1)?,
3683 object_id: r.get(2)?,
3684 confidence: r.get(3)?,
3685 })
3686 })?
3687 .collect::<rusqlite::Result<Vec<_>>>()?;
3688 let ep = ExpandedEpisode {
3689 memory_id: memory_id_q,
3690 ts_ms,
3691 content,
3692 };
3693 Ok::<_, rusqlite::Error>(Some((ep, triples)))
3694 })
3695 .await
3696 .map_err(ApiError::from)?;
3697
3698 let (ep, triples) = fetched.ok_or_else(|| {
3699 ApiError::not_found(format!(
3700 "node_id {node_id_full:?} (memory_id {memory_id_for_err}) not found in current tenant"
3701 ))
3702 })?;
3703
3704 let node = graph_node_for_episode(tenant_id, &ep);
3705 let full_text = Some(ep.content.clone());
3706 let mut triples_out = Vec::with_capacity(triples.len());
3711 for t in triples {
3712 let tgt_id = format!("ent:{}", t.object_id);
3713 triples_out.push(GraphEdge {
3714 id: edge_id(&node_id_full, "triple", &tgt_id),
3715 source: node_id_full.clone(),
3716 target: tgt_id,
3717 kind: "triple",
3718 predicate: Some(t.predicate),
3719 weight: Some(t.confidence),
3720 });
3721 }
3722 Ok(GraphInspectResponse {
3723 node,
3724 full_text,
3725 triples_in: Vec::new(),
3726 triples_out,
3727 })
3728}
3729
3730async fn inspect_document_node(
3731 tenant: &TenantHandle,
3732 tenant_id: &str,
3733 doc_id: String,
3734 node_id_full: String,
3735) -> Result<GraphInspectResponse, ApiError> {
3736 let doc_id_for_err = doc_id.clone();
3737 let doc_id_q = doc_id.clone();
3738 let fetched: Option<(ExpandedDocument, Vec<String>)> = tenant
3744 .read()
3745 .interact(move |conn| {
3746 let doc_row: Option<ExpandedDocument> = conn
3747 .query_row(
3748 "SELECT doc_id, title, source, ingested_at_ms
3749 FROM documents
3750 WHERE doc_id = ?1
3751 AND status = 'active'",
3752 rusqlite::params![&doc_id_q],
3753 |r| {
3754 Ok(ExpandedDocument {
3755 doc_id: r.get(0)?,
3756 title: r.get(1)?,
3757 source: r.get(2)?,
3758 ingested_at_ms: r.get(3)?,
3759 })
3760 },
3761 )
3762 .map(Some)
3763 .or_else(|e| match e {
3764 rusqlite::Error::QueryReturnedNoRows => Ok(None),
3765 other => Err(other),
3766 })?;
3767 let Some(doc) = doc_row else {
3768 return Ok(None);
3769 };
3770 let mut stmt = conn.prepare(
3771 "SELECT content
3772 FROM document_chunks
3773 WHERE doc_id = ?1
3774 ORDER BY chunk_index ASC",
3775 )?;
3776 let chunks = stmt
3777 .query_map(rusqlite::params![&doc_id_q], |r| r.get::<_, String>(0))?
3778 .collect::<rusqlite::Result<Vec<_>>>()?;
3779 Ok::<_, rusqlite::Error>(Some((doc, chunks)))
3780 })
3781 .await
3782 .map_err(ApiError::from)?;
3783
3784 let (doc, chunks) = fetched.ok_or_else(|| {
3785 ApiError::not_found(format!(
3786 "node_id {node_id_full:?} (doc_id {doc_id_for_err}) not found in current tenant"
3787 ))
3788 })?;
3789
3790 let full_text = if chunks.is_empty() {
3791 None
3795 } else {
3796 Some(chunks.join("\n\n"))
3797 };
3798
3799 Ok(GraphInspectResponse {
3800 node: graph_node_for_document(tenant_id, &doc),
3801 full_text,
3802 triples_in: Vec::new(),
3803 triples_out: Vec::new(),
3804 })
3805}
3806
3807async fn inspect_chunk_node(
3808 tenant: &TenantHandle,
3809 tenant_id: &str,
3810 chunk_id: String,
3811 node_id_full: String,
3812) -> Result<GraphInspectResponse, ApiError> {
3813 let chunk_id_for_err = chunk_id.clone();
3814 let chunk_id_q = chunk_id.clone();
3815 let row: Option<(ExpandedChunk, i64)> = tenant
3816 .read()
3817 .interact(move |conn| {
3818 conn.query_row(
3819 "SELECT c.chunk_id, c.chunk_index, c.content, c.created_at_ms
3820 FROM document_chunks c
3821 JOIN documents d ON d.doc_id = c.doc_id
3822 WHERE c.chunk_id = ?1
3823 AND d.status = 'active'",
3824 rusqlite::params![&chunk_id_q],
3825 |r| {
3826 Ok((
3827 ExpandedChunk {
3828 chunk_id: r.get(0)?,
3829 chunk_index: r.get(1)?,
3830 content: r.get(2)?,
3831 },
3832 r.get::<_, i64>(3)?,
3833 ))
3834 },
3835 )
3836 .map(Some)
3837 .or_else(|e| match e {
3838 rusqlite::Error::QueryReturnedNoRows => Ok(None),
3839 other => Err(other),
3840 })
3841 })
3842 .await
3843 .map_err(ApiError::from)?;
3844
3845 let (chunk, created_at_ms) = row.ok_or_else(|| {
3846 ApiError::not_found(format!(
3847 "node_id {node_id_full:?} (chunk_id {chunk_id_for_err}) not found in current tenant"
3848 ))
3849 })?;
3850
3851 let full_text = Some(chunk.content.clone());
3852 let mut node = graph_node_for_chunk(tenant_id, &chunk);
3853 node.ts_ms = Some(created_at_ms);
3856
3857 Ok(GraphInspectResponse {
3858 node,
3859 full_text,
3860 triples_in: Vec::new(),
3861 triples_out: Vec::new(),
3862 })
3863}
3864
3865async fn inspect_cluster_node(
3866 tenant: &TenantHandle,
3867 tenant_id: &str,
3868 cluster_id: String,
3869 node_id_full: String,
3870) -> Result<GraphInspectResponse, ApiError> {
3871 let cluster_id_for_err = cluster_id.clone();
3872 let cluster_id_q = cluster_id.clone();
3873 let row: Option<(Option<String>, i64)> = tenant
3874 .read()
3875 .interact(move |conn| {
3876 conn.query_row(
3877 "SELECT sa.content, c.created_at_ms
3878 FROM clusters c
3879 LEFT JOIN semantic_abstractions sa ON sa.cluster_id = c.cluster_id
3880 WHERE c.cluster_id = ?1",
3881 rusqlite::params![&cluster_id_q],
3882 |r| Ok((r.get::<_, Option<String>>(0)?, r.get::<_, i64>(1)?)),
3883 )
3884 .map(Some)
3885 .or_else(|e| match e {
3886 rusqlite::Error::QueryReturnedNoRows => Ok(None),
3887 other => Err(other),
3888 })
3889 })
3890 .await
3891 .map_err(ApiError::from)?;
3892
3893 let (abstraction, created_at_ms) = row.ok_or_else(|| {
3894 ApiError::not_found(format!(
3895 "node_id {node_id_full:?} (cluster_id {cluster_id_for_err}) not found in current tenant"
3896 ))
3897 })?;
3898
3899 let full_text = match abstraction.as_deref() {
3904 Some(a) => Some(format!("cluster {cluster_id_for_err}\n\n{a}")),
3905 None => Some(format!("cluster {cluster_id_for_err}")),
3906 };
3907
3908 Ok(GraphInspectResponse {
3909 node: graph_node_for_cluster(
3910 tenant_id,
3911 &cluster_id_for_err,
3912 abstraction.as_deref(),
3913 created_at_ms,
3914 ),
3915 full_text,
3916 triples_in: Vec::new(),
3917 triples_out: Vec::new(),
3918 })
3919}
3920
3921async fn inspect_entity_node(
3922 tenant: &TenantHandle,
3923 tenant_id: &str,
3924 entity_value: String,
3925 node_id_full: String,
3926) -> Result<GraphInspectResponse, ApiError> {
3927 let entity_q = entity_value.clone();
3930 let rows: Vec<TripleRow> = tenant
3931 .read()
3932 .interact(move |conn| {
3933 let mut stmt = conn.prepare(
3934 "SELECT subject_id, predicate, object_id, confidence
3935 FROM triples
3936 WHERE (subject_id = ?1 OR object_id = ?1)
3937 AND status = 'active'
3938 ORDER BY valid_from_ms DESC
3939 LIMIT ?2",
3940 )?;
3941 stmt.query_map(
3942 rusqlite::params![&entity_q, GRAPH_INSPECT_ENTITY_TRIPLES_CAP],
3943 |r| {
3944 Ok(TripleRow {
3945 subject_id: r.get(0)?,
3946 predicate: r.get(1)?,
3947 object_id: r.get(2)?,
3948 confidence: r.get(3)?,
3949 })
3950 },
3951 )?
3952 .collect::<rusqlite::Result<Vec<_>>>()
3953 })
3954 .await
3955 .map_err(ApiError::from)?;
3956
3957 if rows.is_empty() {
3958 return Err(ApiError::not_found(format!(
3959 "node_id {node_id_full:?} (entity {entity_value:?}) not found in current tenant -- entities must be referenced by at least one triple to be inspectable"
3960 )));
3961 }
3962
3963 let mut triples_out = Vec::with_capacity(rows.len());
3968 for t in rows {
3969 let other = if t.subject_id == entity_value {
3970 t.object_id
3971 } else {
3972 t.subject_id
3974 };
3975 let tgt_id = format!("ent:{other}");
3976 triples_out.push(GraphEdge {
3977 id: edge_id(&node_id_full, "triple", &tgt_id),
3978 source: node_id_full.clone(),
3979 target: tgt_id,
3980 kind: "triple",
3981 predicate: Some(t.predicate),
3982 weight: Some(t.confidence),
3983 });
3984 }
3985
3986 Ok(GraphInspectResponse {
3987 node: graph_node_for_entity(tenant_id, &entity_value),
3988 full_text: None,
3989 triples_in: Vec::new(),
3990 triples_out,
3991 })
3992}
3993
3994const GRAPH_NEIGHBORS_DEFAULT_LIMIT: u32 = 25;
4061const GRAPH_NEIGHBORS_MAX_LIMIT: u32 = 100;
4063const GRAPH_NEIGHBORS_DEFAULT_THRESHOLD: f32 = 0.75;
4066
4067#[derive(Debug, Clone, Copy, PartialEq, Eq, Default, Deserialize)]
4070#[serde(rename_all = "snake_case")]
4071enum GraphNeighborsKind {
4072 Explicit,
4073 Semantic,
4074 #[default]
4075 Both,
4076}
4077
4078#[derive(Debug, Deserialize)]
4079struct GraphNeighborsQuery {
4080 #[serde(default)]
4081 kind: Option<GraphNeighborsKind>,
4082 #[serde(default)]
4083 threshold: Option<f32>,
4084 #[serde(default)]
4085 limit: Option<u32>,
4086}
4087
4088async fn graph_neighbors_handler(
4090 TenantExtractor(tenant): TenantExtractor,
4091 Path(id): Path<String>,
4092 Query(q): Query<GraphNeighborsQuery>,
4093) -> Result<Json<GraphExpandResponse>, ApiError> {
4094 let kind = q.kind.unwrap_or_default();
4095 let threshold = q.threshold.unwrap_or(GRAPH_NEIGHBORS_DEFAULT_THRESHOLD);
4096 if !(0.0..=1.0).contains(&threshold) {
4097 return Err(ApiError::bad_request(format!(
4098 "threshold must be in [0.0, 1.0]; got {threshold}"
4099 )));
4100 }
4101 let limit_raw = q.limit.unwrap_or(GRAPH_NEIGHBORS_DEFAULT_LIMIT);
4105 let limit = limit_raw.clamp(1, GRAPH_NEIGHBORS_MAX_LIMIT);
4106
4107 let (node_kind, value) = parse_node_id(&id)?;
4108 let value_owned = value.to_string();
4109 let tenant_id_str = tenant.tenant_id().to_string();
4110 let node_id_full = id;
4111
4112 ensure_neighbors_focal_exists(&tenant, node_kind, &value_owned, &node_id_full).await?;
4119
4120 let (explicit_nodes, explicit_edges) = if matches!(
4122 kind,
4123 GraphNeighborsKind::Explicit | GraphNeighborsKind::Both
4124 ) {
4125 neighbors_explicit(
4126 &tenant,
4127 &tenant_id_str,
4128 node_kind,
4129 &value_owned,
4130 &node_id_full,
4131 limit as i64,
4132 )
4133 .await?
4134 } else {
4135 (Vec::new(), Vec::new())
4136 };
4137
4138 let (semantic_nodes, semantic_edges) = if matches!(
4139 kind,
4140 GraphNeighborsKind::Semantic | GraphNeighborsKind::Both
4141 ) {
4142 match neighbors_semantic(
4143 &tenant,
4144 &tenant_id_str,
4145 node_kind,
4146 &value_owned,
4147 &node_id_full,
4148 limit,
4149 threshold,
4150 )
4151 .await
4152 {
4153 Ok(parts) => parts,
4154 Err(e) => {
4155 if matches!(kind, GraphNeighborsKind::Semantic) {
4166 return Err(e);
4167 }
4168 (Vec::new(), Vec::new())
4169 }
4170 }
4171 } else {
4172 (Vec::new(), Vec::new())
4173 };
4174
4175 let mut explicit_endpoints: std::collections::HashSet<(String, String)> =
4178 std::collections::HashSet::with_capacity(explicit_edges.len());
4179 for e in &explicit_edges {
4180 explicit_endpoints.insert((e.source.clone(), e.target.clone()));
4181 }
4182
4183 let mut nodes: Vec<GraphNode> = Vec::with_capacity(explicit_nodes.len() + semantic_nodes.len());
4184 let mut edges: Vec<GraphEdge> =
4185 Vec::with_capacity(explicit_edges.len() + semantic_edges.len());
4186 let mut seen_node_ids: std::collections::HashSet<String> =
4187 std::collections::HashSet::with_capacity(explicit_nodes.len() + semantic_nodes.len());
4188
4189 for n in explicit_nodes {
4190 if seen_node_ids.insert(n.id.clone()) {
4191 nodes.push(n);
4192 }
4193 }
4194 for e in explicit_edges {
4195 edges.push(e);
4196 }
4197 for n in semantic_nodes {
4198 if seen_node_ids.insert(n.id.clone()) {
4199 nodes.push(n);
4200 }
4201 }
4202 for e in semantic_edges {
4203 if explicit_endpoints.contains(&(e.source.clone(), e.target.clone())) {
4204 continue;
4210 }
4211 edges.push(e);
4212 }
4213
4214 Ok(Json(GraphExpandResponse { nodes, edges }))
4215}
4216
4217async fn ensure_neighbors_focal_exists(
4224 tenant: &TenantHandle,
4225 node_kind: NodeKind,
4226 value: &str,
4227 node_id_full: &str,
4228) -> Result<(), ApiError> {
4229 match node_kind {
4230 NodeKind::Episode => ensure_episode_exists(tenant, value, node_id_full).await,
4231 NodeKind::Cluster => ensure_cluster_exists(tenant, value, node_id_full).await,
4232 NodeKind::Document => ensure_document_exists(tenant, value, node_id_full).await,
4233 NodeKind::Chunk => ensure_chunk_exists(tenant, value, node_id_full).await,
4234 NodeKind::Entity => ensure_entity_referenced(tenant, value, node_id_full).await,
4235 }
4236}
4237
4238async fn ensure_chunk_exists(
4242 tenant: &TenantHandle,
4243 chunk_id: &str,
4244 node_id_full: &str,
4245) -> Result<(), ApiError> {
4246 let chunk_id_q = chunk_id.to_string();
4247 let exists: i64 = tenant
4248 .read()
4249 .interact(move |conn| {
4250 conn.query_row(
4251 "SELECT COUNT(*)
4252 FROM document_chunks c
4253 JOIN documents d ON d.doc_id = c.doc_id
4254 WHERE c.chunk_id = ?1
4255 AND d.status = 'active'",
4256 rusqlite::params![&chunk_id_q],
4257 |r| r.get(0),
4258 )
4259 })
4260 .await
4261 .map_err(ApiError::from)?;
4262 if exists == 0 {
4263 return Err(ApiError::not_found(format!(
4264 "node_id {node_id_full:?} not found in current tenant"
4265 )));
4266 }
4267 Ok(())
4268}
4269
4270async fn ensure_entity_referenced(
4274 tenant: &TenantHandle,
4275 entity_value: &str,
4276 node_id_full: &str,
4277) -> Result<(), ApiError> {
4278 let entity_q = entity_value.to_string();
4279 let exists: i64 = tenant
4280 .read()
4281 .interact(move |conn| {
4282 conn.query_row(
4283 "SELECT COUNT(*)
4284 FROM triples
4285 WHERE (subject_id = ?1 OR object_id = ?1)
4286 AND status = 'active'",
4287 rusqlite::params![&entity_q],
4288 |r| r.get(0),
4289 )
4290 })
4291 .await
4292 .map_err(ApiError::from)?;
4293 if exists == 0 {
4294 return Err(ApiError::not_found(format!(
4295 "node_id {node_id_full:?} (entity {entity_value:?}) not found in current tenant -- entities must be referenced by at least one triple to be neighborable"
4296 )));
4297 }
4298 Ok(())
4299}
4300
4301async fn neighbors_explicit(
4307 tenant: &TenantHandle,
4308 tenant_id: &str,
4309 node_kind: NodeKind,
4310 value: &str,
4311 node_id_full: &str,
4312 limit: i64,
4313) -> Result<(Vec<GraphNode>, Vec<GraphEdge>), ApiError> {
4314 let mut nodes: Vec<GraphNode> = Vec::new();
4315 let mut edges: Vec<GraphEdge> = Vec::new();
4316
4317 match node_kind {
4318 NodeKind::Episode => {
4319 let r1 = expand_cluster_member(tenant, tenant_id, node_kind, value, node_id_full, limit)
4327 .await?;
4328 nodes.extend(r1.nodes);
4329 edges.extend(r1.edges);
4330 let r2 =
4331 expand_triple(tenant, tenant_id, node_kind, value, node_id_full, limit).await?;
4332 nodes.extend(r2.nodes);
4333 edges.extend(r2.edges);
4334 }
4335 NodeKind::Document => {
4336 let r = expand_document_chunk(tenant, tenant_id, node_kind, value, node_id_full, limit)
4339 .await?;
4340 nodes.extend(r.nodes);
4341 edges.extend(r.edges);
4342 }
4343 NodeKind::Chunk => {
4344 let r = expand_document_chunk(tenant, tenant_id, node_kind, value, node_id_full, limit)
4347 .await?;
4348 nodes.extend(r.nodes);
4349 edges.extend(r.edges);
4350 }
4351 NodeKind::Cluster => {
4352 let r = expand_cluster_member(tenant, tenant_id, node_kind, value, node_id_full, limit)
4355 .await?;
4356 nodes.extend(r.nodes);
4357 edges.extend(r.edges);
4358 }
4359 NodeKind::Entity => {
4360 let r =
4363 expand_triple(tenant, tenant_id, node_kind, value, node_id_full, limit).await?;
4364 nodes.extend(r.nodes);
4365 edges.extend(r.edges);
4366 }
4367 }
4368 Ok((nodes, edges))
4369}
4370
4371async fn neighbors_semantic(
4385 tenant: &TenantHandle,
4386 tenant_id: &str,
4387 node_kind: NodeKind,
4388 value: &str,
4389 node_id_full: &str,
4390 limit: u32,
4391 threshold: f32,
4392) -> Result<(Vec<GraphNode>, Vec<GraphEdge>), ApiError> {
4393 match node_kind {
4394 NodeKind::Episode => {
4395 neighbors_semantic_from_episode(
4396 tenant,
4397 tenant_id,
4398 value,
4399 node_id_full,
4400 limit,
4401 threshold,
4402 )
4403 .await
4404 }
4405 NodeKind::Chunk => {
4406 neighbors_semantic_from_chunk(
4407 tenant,
4408 tenant_id,
4409 value,
4410 node_id_full,
4411 limit,
4412 threshold,
4413 )
4414 .await
4415 }
4416 _ => Err(ApiError::bad_request(format!(
4417 "semantic neighbors only valid for episode or chunk source; got {}",
4418 node_kind.as_wire_str()
4419 ))),
4420 }
4421}
4422
4423async fn neighbors_semantic_from_episode(
4424 tenant: &TenantHandle,
4425 tenant_id: &str,
4426 memory_id: &str,
4427 node_id_full: &str,
4428 limit: u32,
4429 threshold: f32,
4430) -> Result<(Vec<GraphNode>, Vec<GraphEdge>), ApiError> {
4431 let memory_id_q = memory_id.to_string();
4432 let memory_id_for_self_excl = memory_id.to_string();
4433 let content: Option<String> = tenant
4434 .read()
4435 .interact(move |conn| {
4436 conn.query_row(
4437 "SELECT content FROM episodes WHERE memory_id = ?1 AND status = 'active'",
4438 rusqlite::params![&memory_id_q],
4439 |r| r.get::<_, String>(0),
4440 )
4441 .map(Some)
4442 .or_else(|e| match e {
4443 rusqlite::Error::QueryReturnedNoRows => Ok(None),
4444 other => Err(other),
4445 })
4446 })
4447 .await
4448 .map_err(ApiError::from)?;
4449
4450 let Some(content) = content else {
4454 return Ok((Vec::new(), Vec::new()));
4455 };
4456
4457 let widened = (limit as usize).saturating_add(1).min(100);
4459 let result = solo_query::recall::run_recall_inner(
4460 tenant.embedder(),
4461 tenant.hnsw(),
4462 tenant.read(),
4463 &content,
4464 widened,
4465 )
4466 .await
4467 .map_err(ApiError::from)?;
4468
4469 let mut nodes = Vec::new();
4470 let mut edges = Vec::new();
4471 for hit in result.hits.into_iter() {
4472 if hit.memory_id == memory_id_for_self_excl {
4473 continue;
4475 }
4476 if nodes.len() as u32 >= limit {
4477 break;
4478 }
4479 let weight = (1.0 - hit.cos_distance).max(0.0);
4480 if weight < threshold {
4481 continue;
4482 }
4483 let target_id = format!("ep:{}", hit.memory_id);
4484 edges.push(GraphEdge {
4485 id: edge_id(node_id_full, "semantic", &target_id),
4486 source: node_id_full.to_string(),
4487 target: target_id,
4488 kind: "semantic",
4489 predicate: None,
4490 weight: Some(weight),
4491 });
4492 nodes.push(GraphNode {
4493 id: format!("ep:{}", hit.memory_id),
4494 kind: NodeKind::Episode.as_wire_str(),
4495 label: episode_label(&hit.content),
4496 ts_ms: None,
4497 tenant_id: tenant_id.to_string(),
4498 preview: Some(truncate_preview(&hit.content, GRAPH_PREVIEW_CHARS)),
4499 });
4500 }
4501 Ok((nodes, edges))
4502}
4503
4504async fn neighbors_semantic_from_chunk(
4505 tenant: &TenantHandle,
4506 tenant_id: &str,
4507 chunk_id: &str,
4508 node_id_full: &str,
4509 limit: u32,
4510 threshold: f32,
4511) -> Result<(Vec<GraphNode>, Vec<GraphEdge>), ApiError> {
4512 let chunk_id_q = chunk_id.to_string();
4513 let chunk_id_for_self_excl = chunk_id.to_string();
4514 let content: Option<String> = tenant
4515 .read()
4516 .interact(move |conn| {
4517 conn.query_row(
4518 "SELECT c.content
4519 FROM document_chunks c
4520 JOIN documents d ON d.doc_id = c.doc_id
4521 WHERE c.chunk_id = ?1
4522 AND d.status = 'active'",
4523 rusqlite::params![&chunk_id_q],
4524 |r| r.get::<_, String>(0),
4525 )
4526 .map(Some)
4527 .or_else(|e| match e {
4528 rusqlite::Error::QueryReturnedNoRows => Ok(None),
4529 other => Err(other),
4530 })
4531 })
4532 .await
4533 .map_err(ApiError::from)?;
4534
4535 let Some(content) = content else {
4536 return Ok((Vec::new(), Vec::new()));
4537 };
4538
4539 let widened = (limit as usize).saturating_add(1).min(100);
4540 let hits = solo_query::doc_search::run_doc_search_inner(
4541 tenant.embedder(),
4542 tenant.hnsw(),
4543 tenant.read(),
4544 &content,
4545 widened,
4546 )
4547 .await
4548 .map_err(ApiError::from)?;
4549
4550 let mut nodes = Vec::new();
4551 let mut edges = Vec::new();
4552 for hit in hits.into_iter() {
4553 if hit.chunk_id == chunk_id_for_self_excl {
4554 continue;
4555 }
4556 if nodes.len() as u32 >= limit {
4557 break;
4558 }
4559 let weight = (1.0 - hit.cos_distance).max(0.0);
4560 if weight < threshold {
4561 continue;
4562 }
4563 let target_id = format!("chunk:{}", hit.chunk_id);
4564 edges.push(GraphEdge {
4565 id: edge_id(node_id_full, "semantic", &target_id),
4566 source: node_id_full.to_string(),
4567 target: target_id,
4568 kind: "semantic",
4569 predicate: None,
4570 weight: Some(weight),
4571 });
4572 let exp = ExpandedChunk {
4573 chunk_id: hit.chunk_id.clone(),
4574 chunk_index: hit.chunk_index as i64,
4575 content: hit.content.clone(),
4576 };
4577 nodes.push(graph_node_for_chunk(tenant_id, &exp));
4578 }
4579 Ok((nodes, edges))
4580}
4581
4582pub const STREAM_HEARTBEAT_SECS: u64 = 30;
4623
4624const STREAM_EVENT_INIT: &str = "init";
4627
4628const STREAM_EVENT_INVALIDATE: &str = "invalidate";
4631
4632const STREAM_EVENT_HEARTBEAT: &str = "heartbeat";
4634
4635async fn graph_stream_handler(
4655 TenantExtractor(tenant): TenantExtractor,
4656) -> Sse<impl Stream<Item = Result<Event, Infallible>>> {
4657 let rx = tenant.invalidate_sender().subscribe();
4662 let tenant_id = tenant.tenant_id().to_string();
4663 let stream = build_invalidate_stream(rx, tenant_id, STREAM_HEARTBEAT_SECS);
4664 Sse::new(stream).keep_alive(KeepAlive::new().interval(Duration::from_secs(3600)))
4671}
4672
4673struct StreamState {
4677 rx: broadcast::Receiver<InvalidateEvent>,
4678 heartbeat: tokio::time::Interval,
4679 tenant_id: String,
4680 needs_init: bool,
4683}
4684
4685fn build_invalidate_stream(
4693 rx: broadcast::Receiver<InvalidateEvent>,
4694 tenant_id: String,
4695 heartbeat_secs: u64,
4696) -> impl Stream<Item = Result<Event, Infallible>> {
4697 let start_at = tokio::time::Instant::now() + Duration::from_secs(heartbeat_secs);
4703 let heartbeat =
4704 tokio::time::interval_at(start_at, Duration::from_secs(heartbeat_secs));
4705
4706 let state = StreamState {
4707 rx,
4708 heartbeat,
4709 tenant_id,
4710 needs_init: true,
4711 };
4712 futures::stream::unfold(state, move |mut state| async move {
4713 if state.needs_init {
4717 state.needs_init = false;
4718 let init_payload = serde_json::json!({
4719 "connected": true,
4720 "tenant_id": state.tenant_id,
4721 "ts_ms": chrono::Utc::now().timestamp_millis(),
4722 });
4723 let ev = Event::default()
4724 .event(STREAM_EVENT_INIT)
4725 .json_data(init_payload)
4726 .unwrap_or_else(|_| Event::default().event(STREAM_EVENT_INIT));
4727 return Some((Ok::<Event, Infallible>(ev), state));
4728 }
4729 loop {
4730 tokio::select! {
4731 event = state.rx.recv() => {
4732 match event {
4733 Ok(ev) => {
4734 let sse_event = Event::default()
4735 .event(STREAM_EVENT_INVALIDATE)
4736 .json_data(&ev)
4737 .unwrap_or_else(|_| Event::default()
4738 .event(STREAM_EVENT_INVALIDATE));
4739 return Some((Ok::<Event, Infallible>(sse_event), state));
4740 }
4741 Err(broadcast::error::RecvError::Lagged(n)) => {
4742 tracing::warn!(
4743 lagged = n,
4744 "graph stream subscriber lagged; client will \
4745 resync on the next real invalidate"
4746 );
4747 }
4750 Err(broadcast::error::RecvError::Closed) => {
4751 tracing::debug!(
4752 "graph stream broadcast closed; ending SSE stream"
4753 );
4754 return None;
4755 }
4756 }
4757 }
4758 _ = state.heartbeat.tick() => {
4759 let hb_payload = serde_json::json!({
4760 "ts_ms": chrono::Utc::now().timestamp_millis(),
4761 });
4762 let sse_event = Event::default()
4763 .event(STREAM_EVENT_HEARTBEAT)
4764 .json_data(hb_payload)
4765 .unwrap_or_else(|_| Event::default()
4766 .event(STREAM_EVENT_HEARTBEAT));
4767 return Some((Ok::<Event, Infallible>(sse_event), state));
4768 }
4769 }
4770 }
4771 })
4772}
4773
4774#[derive(Debug, Clone, Serialize)]
4856struct TenantListItem {
4857 id: String,
4860 #[serde(skip_serializing_if = "Option::is_none")]
4863 display_name: Option<String>,
4864 created_at_ms: i64,
4866 #[serde(skip_serializing_if = "Option::is_none")]
4870 last_accessed_ms: Option<i64>,
4871 status: TenantStatusJson,
4876 #[serde(skip_serializing_if = "Option::is_none")]
4879 quota_bytes: Option<u64>,
4880 episode_count: Option<i64>,
4883 size_bytes: Option<u64>,
4885 pct_used: Option<f64>,
4888}
4889
4890#[derive(Debug, Clone, Copy, Serialize)]
4897#[serde(rename_all = "snake_case")]
4898enum TenantStatusJson {
4899 Active,
4900}
4901
4902impl From<&solo_storage::TenantStatus> for TenantStatusJson {
4903 fn from(s: &solo_storage::TenantStatus) -> Self {
4904 match s {
4908 solo_storage::TenantStatus::Active => TenantStatusJson::Active,
4909 solo_storage::TenantStatus::PendingMigration
4913 | solo_storage::TenantStatus::PendingDelete => TenantStatusJson::Active,
4914 }
4915 }
4916}
4917
4918#[derive(Debug, Serialize)]
4920struct TenantsListResponse {
4921 tenants: Vec<TenantListItem>,
4922}
4923
4924async fn tenants_list_handler(
4937 State(state): State<SoloHttpState>,
4938 MaybePrincipal(maybe_principal): MaybePrincipal,
4939) -> Result<Json<TenantsListResponse>, ApiError> {
4940 let mut records = state.registry.list_active().await.map_err(ApiError::from)?;
4946
4947 records.retain(|r| matches!(r.status, solo_storage::TenantStatus::Active));
4952
4953 let filtered = filter_tenants_for_principal(records, maybe_principal.as_ref());
4958
4959 let tenants = filtered
4960 .iter()
4961 .map(|r| TenantListItem {
4962 id: r.tenant_id.to_string(),
4963 display_name: r.display_name.clone(),
4964 created_at_ms: r.created_at_ms,
4965 last_accessed_ms: r.last_accessed_ms,
4966 status: TenantStatusJson::from(&r.status),
4967 quota_bytes: r.quota_bytes,
4968 episode_count: None,
4972 size_bytes: None,
4973 pct_used: None,
4974 })
4975 .collect();
4976
4977 Ok(Json(TenantsListResponse { tenants }))
4978}
4979
4980fn filter_tenants_for_principal(
4993 records: Vec<solo_storage::TenantRecord>,
4994 principal: Option<&AuthenticatedPrincipal>,
4995) -> Vec<solo_storage::TenantRecord> {
4996 let Some(p) = principal else {
4997 return records;
5000 };
5001 if is_single_principal_bearer(p) {
5002 return records;
5005 }
5006 let Some(claim) = p.tenant_claim.as_ref() else {
5010 return Vec::new();
5011 };
5012 records
5013 .into_iter()
5014 .filter(|r| r.tenant_id == *claim)
5015 .collect()
5016}
5017
5018fn is_single_principal_bearer(principal: &AuthenticatedPrincipal) -> bool {
5030 principal.subject == "bearer"
5031 && principal.claims.is_null()
5032 && principal.scopes.is_empty()
5033}
5034
5035#[derive(Debug)]
5040pub struct ApiError {
5041 status: StatusCode,
5042 message: String,
5043}
5044
5045impl ApiError {
5046 fn bad_request(msg: impl Into<String>) -> Self {
5047 Self {
5048 status: StatusCode::BAD_REQUEST,
5049 message: msg.into(),
5050 }
5051 }
5052 fn not_found(msg: impl Into<String>) -> Self {
5053 Self {
5054 status: StatusCode::NOT_FOUND,
5055 message: msg.into(),
5056 }
5057 }
5058 fn internal(msg: impl Into<String>) -> Self {
5059 Self {
5060 status: StatusCode::INTERNAL_SERVER_ERROR,
5061 message: msg.into(),
5062 }
5063 }
5064}
5065
5066impl From<solo_core::Error> for ApiError {
5067 fn from(e: solo_core::Error) -> Self {
5068 use solo_core::Error;
5069 match e {
5070 Error::NotFound(msg) => ApiError::not_found(msg),
5071 Error::InvalidInput(msg) => ApiError::bad_request(msg),
5072 Error::Conflict(msg) => Self {
5073 status: StatusCode::CONFLICT,
5074 message: msg,
5075 },
5076 other => ApiError::internal(other.to_string()),
5077 }
5078 }
5079}
5080
5081impl IntoResponse for ApiError {
5082 fn into_response(self) -> Response {
5083 let body = serde_json::json!({
5084 "error": self.message,
5085 "status": self.status.as_u16(),
5086 });
5087 (self.status, Json(body)).into_response()
5088 }
5089}
5090
5091#[cfg(test)]
5095mod handler_tests {
5096 use super::*;
5105 use axum::body::Body;
5106 use axum::http::{Request, StatusCode};
5107 use http_body_util::BodyExt;
5108 use serde_json::{Value, json};
5109 use solo_storage::test_support::StubVectorIndex;
5110 use solo_storage::{
5111 EmbedderConfig, IdentityConfig, KeyMaterial, ReaderPool, SoloConfig,
5112 StubEmbedder, TenantHandle, TenantRegistry, WriterActor, WriterSpawn,
5113 };
5114 use solo_core::VectorIndex;
5115 use std::sync::Arc as StdArc;
5116 use tower::ServiceExt;
5117
5118 fn fake_config(dim: u32) -> SoloConfig {
5119 SoloConfig {
5120 schema_version: 1,
5121 salt_hex: "00000000000000000000000000000000".to_string(),
5122 embedder: EmbedderConfig {
5123 name: "stub".to_string(),
5124 version: "v1".to_string(),
5125 dim,
5126 dtype: "f32".to_string(),
5127 },
5128 identity: IdentityConfig::default(),
5129 documents: solo_storage::DocumentConfig::default(),
5130 auth: None,
5131 audit: solo_storage::AuditSettings::default(),
5132 redaction: solo_storage::RedactionConfig::default(),
5133 llm: None,
5134 triples: solo_storage::TriplesConfig::default(),
5135 sampling: solo_storage::SamplingConfig::default(),
5136 }
5137 }
5138
5139 struct Harness {
5140 router: axum::Router,
5141 _tmp: tempfile::TempDir,
5142 db_path: std::path::PathBuf,
5143 write_handle_extra: Option<solo_storage::WriteHandle>,
5144 join: Option<std::thread::JoinHandle<()>>,
5145 tenant_handle: StdArc<TenantHandle>,
5150 registry: StdArc<TenantRegistry>,
5154 }
5155
5156 impl Harness {
5157 fn invalidate_sender(&self) -> tokio::sync::broadcast::Sender<InvalidateEvent> {
5164 self.tenant_handle.invalidate_sender().clone()
5165 }
5166 }
5167
5168 impl Harness {
5169 fn new(runtime: &tokio::runtime::Runtime) -> Self {
5170 Self::new_with_auth(runtime, None)
5171 }
5172
5173 fn open_db(&self) -> rusqlite::Connection {
5177 solo_storage::test_support::open_test_db_at(&self.db_path)
5178 }
5179
5180 fn new_with_auth(
5181 runtime: &tokio::runtime::Runtime,
5182 bearer_token: Option<String>,
5183 ) -> Self {
5184 Self::new_with_auth_config(
5185 runtime,
5186 bearer_token.map(|token| crate::auth::AuthConfig::Bearer { token }),
5187 )
5188 }
5189
5190 fn new_with_auth_config(
5191 runtime: &tokio::runtime::Runtime,
5192 auth: Option<crate::auth::AuthConfig>,
5193 ) -> Self {
5194 use solo_storage::embedder_registry::{EmbedderIdentity, get_or_insert_embedder_id};
5195
5196 let tmp = tempfile::TempDir::new().unwrap();
5197 let dim = 16usize;
5198 let hnsw: StdArc<dyn VectorIndex + Send + Sync> = StdArc::new(StubVectorIndex::new(dim));
5199 let embedder: StdArc<dyn solo_core::Embedder> =
5200 StdArc::new(StubEmbedder::new("stub", "v1", dim));
5201 let path = tmp.path().join("test.db");
5202
5203 let embedder_id = {
5204 let conn = solo_storage::test_support::open_test_db_at(&path);
5205 get_or_insert_embedder_id(
5206 &conn,
5207 &EmbedderIdentity {
5208 name: "stub".into(),
5209 version: "v1".into(),
5210 dim: dim as u32,
5211 dtype: "f32".into(),
5212 },
5213 )
5214 .unwrap()
5215 };
5216
5217 let conn = solo_storage::test_support::open_test_db_at(&path);
5218 let WriterSpawn { handle, join } = WriterActor::spawn_full(
5219 conn,
5220 hnsw.clone(),
5221 tmp.path().to_path_buf(),
5222 embedder_id,
5223 );
5224 let pool: ReaderPool =
5225 runtime.block_on(async { ReaderPool::new(&path, None, hnsw.clone()).unwrap() });
5226
5227 let tenant_id = solo_core::TenantId::default_tenant();
5230 let tenant_handle = StdArc::new(
5231 TenantHandle::from_parts_for_tests(
5232 tenant_id.clone(),
5233 fake_config(dim as u32),
5234 path.clone(),
5235 tmp.path().to_path_buf(),
5236 embedder_id,
5237 hnsw,
5238 embedder.clone(),
5239 handle.clone(),
5240 std::thread::spawn(|| {}),
5246 pool,
5247 ),
5248 );
5249 let tenant_handle_clone = tenant_handle.clone();
5250
5251 let key = KeyMaterial::from_bytes_for_tests([0u8; 32]);
5255 let registry = StdArc::new(TenantRegistry::for_tests_with_single_tenant(
5256 tmp.path().to_path_buf(),
5257 key,
5258 embedder,
5259 tenant_handle,
5260 ));
5261 let registry_clone = registry.clone();
5262
5263 let state = SoloHttpState {
5264 registry,
5265 default_tenant: tenant_id,
5266 user_aliases: Arc::new(Vec::new()),
5267 };
5268 let router = router_with_auth_config(state, auth);
5269 Harness {
5270 router,
5271 _tmp: tmp,
5272 db_path: path,
5273 write_handle_extra: Some(handle),
5274 join: Some(join),
5275 tenant_handle: tenant_handle_clone,
5276 registry: registry_clone,
5277 }
5278 }
5279
5280 fn shutdown(mut self, runtime: &tokio::runtime::Runtime) {
5281 let join = self.join.take();
5282 let extra = self.write_handle_extra.take();
5283 let tenant_handle = self.tenant_handle;
5290 let registry = self.registry;
5296 runtime.block_on(async move {
5297 drop(extra);
5298 drop(tenant_handle); drop(registry); drop(self.router); drop(self._tmp);
5302 if let Some(join) = join {
5303 let (tx, rx) = std::sync::mpsc::channel();
5304 std::thread::spawn(move || {
5305 let _ = tx.send(join.join());
5306 });
5307 tokio::task::spawn_blocking(move || {
5308 rx.recv_timeout(std::time::Duration::from_secs(5))
5309 })
5310 .await
5311 .expect("blocking task")
5312 .expect("writer thread did not exit within 5s")
5313 .expect("writer thread panicked");
5314 }
5315 });
5316 }
5317 }
5318
5319 fn rt() -> tokio::runtime::Runtime {
5320 tokio::runtime::Builder::new_multi_thread()
5321 .worker_threads(2)
5322 .enable_all()
5323 .build()
5324 .unwrap()
5325 }
5326
5327 async fn call(
5331 router: axum::Router,
5332 method: &str,
5333 uri: &str,
5334 body: Option<Value>,
5335 ) -> (StatusCode, Value) {
5336 call_with_auth(router, method, uri, body, None).await
5337 }
5338
5339 async fn call_with_auth(
5340 router: axum::Router,
5341 method: &str,
5342 uri: &str,
5343 body: Option<Value>,
5344 auth: Option<&str>,
5345 ) -> (StatusCode, Value) {
5346 let mut req_builder = Request::builder()
5347 .method(method)
5348 .uri(uri)
5349 .header("content-type", "application/json");
5350 if let Some(a) = auth {
5351 req_builder = req_builder.header("authorization", a);
5352 }
5353 let req = if let Some(b) = body {
5354 let bytes = serde_json::to_vec(&b).unwrap();
5355 req_builder.body(Body::from(bytes)).unwrap()
5356 } else {
5357 req_builder = req_builder.header("content-length", "0");
5358 req_builder.body(Body::empty()).unwrap()
5359 };
5360 let resp = router.oneshot(req).await.expect("oneshot");
5361 let status = resp.status();
5362 let body_bytes = resp.into_body().collect().await.unwrap().to_bytes();
5363 let v: Value = if body_bytes.is_empty() {
5364 Value::Null
5365 } else {
5366 serde_json::from_slice(&body_bytes).unwrap_or(Value::Null)
5367 };
5368 (status, v)
5369 }
5370
5371 #[test]
5372 fn health_returns_ok() {
5373 let runtime = rt();
5374 let h = Harness::new(&runtime);
5375 let r = h.router.clone();
5376 let (status, _body) = runtime.block_on(call(r, "GET", "/health", None));
5377 assert_eq!(status, StatusCode::OK);
5378 h.shutdown(&runtime);
5379 }
5380
5381 #[test]
5386 fn openapi_json_describes_all_endpoints() {
5387 let runtime = rt();
5388 let h = Harness::new(&runtime);
5389 let r = h.router.clone();
5390 let (status, spec) = runtime.block_on(call(r, "GET", "/openapi.json", None));
5391 assert_eq!(status, StatusCode::OK);
5392 assert!(spec.is_object(), "openapi.json must be a JSON object");
5393
5394 assert!(
5396 spec.get("openapi")
5397 .and_then(|v| v.as_str())
5398 .is_some_and(|s| s.starts_with("3.")),
5399 "missing or wrong openapi version: {spec}"
5400 );
5401 assert!(spec.pointer("/info/title").is_some());
5402 assert!(spec.pointer("/info/version").is_some());
5403
5404 let paths = spec
5406 .get("paths")
5407 .and_then(|v| v.as_object())
5408 .expect("paths must be an object");
5409 for expected in [
5410 "/health",
5411 "/openapi.json",
5412 "/memory",
5413 "/memory/search",
5414 "/memory/consolidate",
5415 "/memory/{id}",
5416 "/memory/themes",
5418 "/memory/facts_about",
5419 "/memory/contradictions",
5420 "/memory/clusters/{cluster_id}",
5422 "/memory/documents",
5424 "/memory/documents/search",
5425 "/memory/documents/{id}",
5426 ] {
5427 assert!(
5428 paths.contains_key(expected),
5429 "openapi paths missing {expected}: {paths:?}"
5430 );
5431 }
5432
5433 let docs = paths.get("/memory/documents").expect("/memory/documents");
5436 assert!(docs.get("post").is_some(), "POST /memory/documents undocumented");
5437 assert!(docs.get("get").is_some(), "GET /memory/documents undocumented");
5438
5439 let docid = paths
5442 .get("/memory/documents/{id}")
5443 .expect("/memory/documents/{id}");
5444 assert!(
5445 docid.get("get").is_some(),
5446 "GET /memory/documents/{{id}} undocumented"
5447 );
5448 assert!(
5449 docid.get("delete").is_some(),
5450 "DELETE /memory/documents/{{id}} undocumented"
5451 );
5452
5453 let memid = paths.get("/memory/{id}").expect("memory/{id}");
5456 assert!(memid.get("get").is_some(), "GET /memory/{{id}} undocumented");
5457 assert!(
5458 memid.get("delete").is_some(),
5459 "DELETE /memory/{{id}} undocumented"
5460 );
5461
5462 for schema_name in [
5464 "RememberRequest",
5465 "RememberResponse",
5466 "RecallRequest",
5467 "RecallResult",
5468 "EpisodeRecord",
5469 "ApiError",
5470 "ConsolidationScope",
5471 "ConsolidationReport",
5472 "ThemeHit",
5474 "FactHit",
5475 "ContradictionHit",
5476 "ClusterRecord",
5478 "IngestDocumentRequest",
5480 "IngestReport",
5481 "ForgetDocumentReport",
5482 "SearchDocsRequest",
5483 "DocSearchHit",
5484 "DocumentInspectResult",
5485 "DocumentSummary",
5486 ] {
5487 let ptr = format!("/components/schemas/{schema_name}");
5488 assert!(
5489 spec.pointer(&ptr).is_some(),
5490 "component schema {schema_name} missing"
5491 );
5492 }
5493
5494 assert!(
5496 spec.pointer("/components/securitySchemes/bearerAuth")
5497 .is_some(),
5498 "bearerAuth security scheme missing"
5499 );
5500
5501 h.shutdown(&runtime);
5502 }
5503
5504 #[test]
5508 fn openapi_json_is_exempt_from_bearer_auth() {
5509 let runtime = rt();
5510 let h = Harness::new_with_auth(&runtime, Some("super-secret".into()));
5511 let r = h.router.clone();
5512 let (status, _body) = runtime.block_on(call(r, "GET", "/openapi.json", None));
5514 assert_eq!(status, StatusCode::OK);
5515 h.shutdown(&runtime);
5516 }
5517
5518 #[test]
5519 fn remember_returns_memory_id() {
5520 let runtime = rt();
5521 let h = Harness::new(&runtime);
5522 let r = h.router.clone();
5523 let (status, body) = runtime.block_on(call(
5524 r,
5525 "POST",
5526 "/memory",
5527 Some(json!({ "content": "http harness test" })),
5528 ));
5529 assert_eq!(status, StatusCode::OK);
5530 let mid = body.get("memory_id").and_then(|v| v.as_str()).unwrap();
5531 assert_eq!(mid.len(), 36, "uuid length");
5532 h.shutdown(&runtime);
5533 }
5534
5535 #[test]
5536 fn empty_content_returns_400() {
5537 let runtime = rt();
5538 let h = Harness::new(&runtime);
5539 let r = h.router.clone();
5540 let (status, body) =
5541 runtime.block_on(call(r, "POST", "/memory", Some(json!({ "content": "" }))));
5542 assert_eq!(status, StatusCode::BAD_REQUEST);
5543 assert!(
5544 body.get("error")
5545 .and_then(|e| e.as_str())
5546 .map(|s| s.contains("must not be empty"))
5547 .unwrap_or(false),
5548 "got: {body}"
5549 );
5550 h.shutdown(&runtime);
5551 }
5552
5553 #[test]
5554 fn empty_query_returns_400() {
5555 let runtime = rt();
5556 let h = Harness::new(&runtime);
5557 let r = h.router.clone();
5558 let (status, body) = runtime.block_on(call(
5559 r,
5560 "POST",
5561 "/memory/search",
5562 Some(json!({ "query": "" })),
5563 ));
5564 assert_eq!(status, StatusCode::BAD_REQUEST);
5565 assert!(
5566 body.get("error")
5567 .and_then(|e| e.as_str())
5568 .map(|s| s.contains("must not be empty"))
5569 .unwrap_or(false),
5570 "got: {body}"
5571 );
5572 h.shutdown(&runtime);
5573 }
5574
5575 #[test]
5576 fn inspect_unknown_returns_404() {
5577 let runtime = rt();
5578 let h = Harness::new(&runtime);
5579 let r = h.router.clone();
5580 let (status, body) = runtime.block_on(call(
5581 r,
5582 "GET",
5583 "/memory/00000000-0000-7000-8000-000000000000",
5584 None,
5585 ));
5586 assert_eq!(status, StatusCode::NOT_FOUND);
5587 assert!(body.get("error").is_some(), "got: {body}");
5588 h.shutdown(&runtime);
5589 }
5590
5591 #[test]
5592 fn inspect_invalid_id_returns_400() {
5593 let runtime = rt();
5594 let h = Harness::new(&runtime);
5595 let r = h.router.clone();
5596 let (status, _body) = runtime.block_on(call(r, "GET", "/memory/not-a-uuid", None));
5597 assert_eq!(status, StatusCode::BAD_REQUEST);
5598 h.shutdown(&runtime);
5599 }
5600
5601 #[test]
5602 fn forget_unknown_returns_404() {
5603 let runtime = rt();
5604 let h = Harness::new(&runtime);
5605 let r = h.router.clone();
5606 let (status, _body) = runtime.block_on(call(
5607 r,
5608 "DELETE",
5609 "/memory/00000000-0000-7000-8000-000000000000",
5610 None,
5611 ));
5612 assert_eq!(status, StatusCode::NOT_FOUND);
5613 h.shutdown(&runtime);
5614 }
5615
5616 #[test]
5624 fn consolidate_endpoint_returns_report() {
5625 let runtime = rt();
5626 let h = Harness::new(&runtime);
5627 let r = h.router.clone();
5628 runtime.block_on(async move {
5629 let (status, body) = call(r.clone(), "POST", "/memory/consolidate", None).await;
5631 assert_eq!(status, StatusCode::OK);
5632 for field in [
5633 "episodes_seen",
5634 "clusters_built",
5635 "episodes_clustered",
5636 "abstractions_built",
5637 "triples_built",
5638 "contradictions_found",
5639 ] {
5640 assert!(
5641 body.get(field).and_then(|v| v.as_u64()).is_some(),
5642 "missing field {field}: {body}"
5643 );
5644 }
5645 assert_eq!(body["episodes_seen"], 0);
5646 assert_eq!(body["clusters_built"], 0);
5647
5648 let (status2, _body2) = call(
5651 r,
5652 "POST",
5653 "/memory/consolidate",
5654 Some(json!({ "window_days": 7 })),
5655 )
5656 .await;
5657 assert_eq!(status2, StatusCode::OK);
5658 });
5659 h.shutdown(&runtime);
5660 }
5661
5662 #[test]
5663 fn auth_required_routes_reject_missing_token() {
5664 let runtime = rt();
5665 let h = Harness::new_with_auth(&runtime, Some("secret-xyz".into()));
5666 let r = h.router.clone();
5667 runtime.block_on(async move {
5668 let (status, _body) = call(
5670 r.clone(),
5671 "POST",
5672 "/memory",
5673 Some(json!({ "content": "x" })),
5674 )
5675 .await;
5676 assert_eq!(status, StatusCode::UNAUTHORIZED);
5677
5678 let (status, _body) = call_with_auth(
5680 r.clone(),
5681 "POST",
5682 "/memory",
5683 Some(json!({ "content": "x" })),
5684 Some("Bearer wrong-token"),
5685 )
5686 .await;
5687 assert_eq!(status, StatusCode::UNAUTHORIZED);
5688
5689 let (status, body) = call_with_auth(
5691 r.clone(),
5692 "POST",
5693 "/memory",
5694 Some(json!({ "content": "authed" })),
5695 Some("Bearer secret-xyz"),
5696 )
5697 .await;
5698 assert_eq!(status, StatusCode::OK);
5699 assert!(body.get("memory_id").is_some());
5700 });
5701 h.shutdown(&runtime);
5702 }
5703
5704 #[test]
5705 fn health_endpoint_does_not_require_auth() {
5706 let runtime = rt();
5707 let h = Harness::new_with_auth(&runtime, Some("secret".into()));
5708 let r = h.router.clone();
5709 let (status, _body) = runtime.block_on(call(r, "GET", "/health", None));
5710 assert_eq!(status, StatusCode::OK);
5712 h.shutdown(&runtime);
5713 }
5714
5715 #[test]
5716 fn auth_response_includes_www_authenticate_header() {
5717 let runtime = rt();
5722 let h = Harness::new_with_auth(&runtime, Some("secret".into()));
5723 let r = h.router.clone();
5724 runtime.block_on(async move {
5725 let req = Request::builder()
5726 .method("POST")
5727 .uri("/memory")
5728 .header("content-type", "application/json")
5729 .body(Body::from(serde_json::to_vec(&json!({ "content": "x" })).unwrap()))
5730 .unwrap();
5731 let resp = r.oneshot(req).await.unwrap();
5732 assert_eq!(resp.status(), StatusCode::UNAUTHORIZED);
5733 let www = resp
5734 .headers()
5735 .get("www-authenticate")
5736 .and_then(|v| v.to_str().ok())
5737 .unwrap_or("");
5738 assert!(
5739 www.starts_with("Bearer"),
5740 "expected WWW-Authenticate: Bearer..., got: {www}"
5741 );
5742 });
5743 h.shutdown(&runtime);
5744 }
5745
5746 fn base64_url_for_test(bytes: &[u8]) -> String {
5754 use base64::Engine;
5755 base64::engine::general_purpose::URL_SAFE_NO_PAD.encode(bytes)
5756 }
5757
5758 async fn spin_fake_idp() -> (wiremock::MockServer, String, Vec<u8>, &'static str) {
5761 use wiremock::matchers::{method, path};
5762 use wiremock::{Mock, MockServer, ResponseTemplate};
5763 let server = MockServer::start().await;
5764 let secret = b"http-test-secret-for-hmac-fixture".to_vec();
5765 let kid = "http-test-kid";
5766 let discovery = serde_json::json!({
5767 "issuer": server.uri(),
5768 "jwks_uri": format!("{}/jwks", server.uri()),
5769 });
5770 Mock::given(method("GET"))
5771 .and(path("/.well-known/openid-configuration"))
5772 .respond_with(ResponseTemplate::new(200).set_body_json(discovery))
5773 .mount(&server)
5774 .await;
5775 let jwks = serde_json::json!({
5776 "keys": [
5777 {
5778 "kty": "oct",
5779 "kid": kid,
5780 "alg": "HS256",
5781 "k": base64_url_for_test(&secret),
5782 }
5783 ]
5784 });
5785 Mock::given(method("GET"))
5786 .and(path("/jwks"))
5787 .respond_with(ResponseTemplate::new(200).set_body_json(jwks))
5788 .mount(&server)
5789 .await;
5790 let discovery_url = format!("{}/.well-known/openid-configuration", server.uri());
5791 (server, discovery_url, secret, kid)
5792 }
5793
5794 fn mint_idp_token(
5795 server_uri: &str,
5796 kid: &str,
5797 secret: &[u8],
5798 tenant_claim: &str,
5799 audience: &str,
5800 ) -> String {
5801 use jsonwebtoken::{Algorithm, EncodingKey, Header};
5802 let mut header = Header::new(Algorithm::HS256);
5803 header.kid = Some(kid.to_string());
5804 let now = std::time::SystemTime::now()
5805 .duration_since(std::time::UNIX_EPOCH)
5806 .unwrap()
5807 .as_secs();
5808 let claims = serde_json::json!({
5809 "iss": server_uri,
5810 "sub": "test-user-1",
5811 "aud": audience,
5812 "exp": now + 600,
5813 "iat": now,
5814 "solo_tenant": tenant_claim,
5815 });
5816 jsonwebtoken::encode(&header, &claims, &EncodingKey::from_secret(secret))
5817 .expect("mint token")
5818 }
5819
5820 #[test]
5821 fn http_oidc_accept_resolves_to_tenant_from_claim() {
5822 let runtime = rt();
5823 let (fake_server, discovery_url, secret, kid) =
5824 runtime.block_on(async { spin_fake_idp().await });
5825 let server_uri = fake_server.uri();
5826 let _server_guard = fake_server;
5828
5829 let auth = crate::auth::AuthConfig::Oidc {
5830 discovery_url,
5831 audience: "test-audience".to_string(),
5832 tenant_claim_name: "solo_tenant".to_string(),
5833 };
5834 let h = Harness::new_with_auth_config(&runtime, Some(auth));
5835 let r = h.router.clone();
5836
5837 let token = mint_idp_token(
5839 &server_uri,
5840 kid,
5841 &secret,
5842 "default",
5843 "test-audience",
5844 );
5845
5846 runtime.block_on(async move {
5847 let (status, body) = call_with_auth(
5849 r.clone(),
5850 "POST",
5851 "/memory",
5852 Some(json!({ "content": "oidc-routed content" })),
5853 Some(&format!("Bearer {token}")),
5854 )
5855 .await;
5856 assert_eq!(status, StatusCode::OK, "got body: {body}");
5857 assert!(body.get("memory_id").is_some(), "no memory_id in {body}");
5858 });
5859 h.shutdown(&runtime);
5860 }
5861
5862 #[test]
5863 fn http_oidc_reject_missing_token_returns_401() {
5864 let runtime = rt();
5865 let (fake_server, discovery_url, _secret, _kid) =
5866 runtime.block_on(async { spin_fake_idp().await });
5867 let _server_guard = fake_server;
5868 let auth = crate::auth::AuthConfig::Oidc {
5869 discovery_url,
5870 audience: "test-audience".to_string(),
5871 tenant_claim_name: "solo_tenant".to_string(),
5872 };
5873 let h = Harness::new_with_auth_config(&runtime, Some(auth));
5874 let r = h.router.clone();
5875 runtime.block_on(async move {
5876 let (status, _body) =
5878 call(r.clone(), "POST", "/memory", Some(json!({ "content": "x" }))).await;
5879 assert_eq!(status, StatusCode::UNAUTHORIZED);
5880
5881 let (status, _body) = call_with_auth(
5883 r.clone(),
5884 "POST",
5885 "/memory",
5886 Some(json!({ "content": "x" })),
5887 Some("Bearer not-a-real-jwt"),
5888 )
5889 .await;
5890 assert_eq!(status, StatusCode::UNAUTHORIZED);
5891 });
5892 h.shutdown(&runtime);
5893 }
5894
5895 #[test]
5896 fn full_remember_recall_inspect_forget_round_trip() {
5897 let runtime = rt();
5898 let h = Harness::new(&runtime);
5899 let r = h.router.clone();
5900 runtime.block_on(async move {
5901 let (status, body) = call(
5903 r.clone(),
5904 "POST",
5905 "/memory",
5906 Some(json!({ "content": "round-trip content" })),
5907 )
5908 .await;
5909 assert_eq!(status, StatusCode::OK);
5910 let mid = body
5911 .get("memory_id")
5912 .and_then(|v| v.as_str())
5913 .unwrap()
5914 .to_string();
5915
5916 let (status, body) = call(
5918 r.clone(),
5919 "POST",
5920 "/memory/search",
5921 Some(json!({ "query": "round-trip content", "limit": 5 })),
5922 )
5923 .await;
5924 assert_eq!(status, StatusCode::OK);
5925 let hits = body.get("hits").and_then(|v| v.as_array()).unwrap();
5926 assert!(
5927 hits.iter()
5928 .any(|h| h.get("content").and_then(|c| c.as_str())
5929 == Some("round-trip content")),
5930 "expected hit with content; got: {body}"
5931 );
5932
5933 let (status, body) = call(r.clone(), "GET", &format!("/memory/{mid}"), None).await;
5935 assert_eq!(status, StatusCode::OK);
5936 assert_eq!(body.get("status").and_then(|v| v.as_str()), Some("active"));
5937
5938 let (status, _body) =
5940 call(r.clone(), "DELETE", &format!("/memory/{mid}"), None).await;
5941 assert_eq!(status, StatusCode::NO_CONTENT);
5942
5943 let (status, body) = call(r.clone(), "GET", &format!("/memory/{mid}"), None).await;
5945 assert_eq!(status, StatusCode::OK);
5946 assert_eq!(
5947 body.get("status").and_then(|v| v.as_str()),
5948 Some("forgotten")
5949 );
5950
5951 let (status, body) = call(
5953 r.clone(),
5954 "POST",
5955 "/memory/search",
5956 Some(json!({ "query": "round-trip content", "limit": 5 })),
5957 )
5958 .await;
5959 assert_eq!(status, StatusCode::OK);
5960 let hits = body.get("hits").and_then(|v| v.as_array()).unwrap();
5961 assert!(
5962 hits.iter().all(|h| h.get("memory_id").and_then(|m| m.as_str())
5963 != Some(mid.as_str())),
5964 "forgotten row should be excluded from recall: {body}"
5965 );
5966 });
5967 h.shutdown(&runtime);
5968 }
5969
5970 #[test]
5977 fn themes_endpoint_returns_empty_array_on_empty_db() {
5978 let runtime = rt();
5979 let h = Harness::new(&runtime);
5980 let r = h.router.clone();
5981 let (status, body) =
5982 runtime.block_on(call(r, "GET", "/memory/themes", None));
5983 assert_eq!(status, StatusCode::OK);
5984 assert!(body.is_array(), "expected array, got {body}");
5985 assert_eq!(body.as_array().unwrap().len(), 0);
5986 h.shutdown(&runtime);
5987 }
5988
5989 #[test]
5990 fn themes_endpoint_passes_through_query_params() {
5991 let runtime = rt();
5992 let h = Harness::new(&runtime);
5993 let r = h.router.clone();
5994 let (status, body) = runtime.block_on(call(
5995 r,
5996 "GET",
5997 "/memory/themes?window_days=7&limit=20",
5998 None,
5999 ));
6000 assert_eq!(status, StatusCode::OK);
6001 assert!(body.is_array(), "expected array, got {body}");
6002 h.shutdown(&runtime);
6003 }
6004
6005 #[test]
6006 fn facts_about_endpoint_requires_subject() {
6007 let runtime = rt();
6008 let h = Harness::new(&runtime);
6009 let r = h.router.clone();
6010 let (status, _body) =
6014 runtime.block_on(call(r, "GET", "/memory/facts_about", None));
6015 assert!(
6016 status == StatusCode::BAD_REQUEST
6017 || status == StatusCode::UNPROCESSABLE_ENTITY,
6018 "expected 400 or 422 for missing subject, got {status}"
6019 );
6020 h.shutdown(&runtime);
6021 }
6022
6023 #[test]
6024 fn facts_about_endpoint_rejects_blank_subject() {
6025 let runtime = rt();
6026 let h = Harness::new(&runtime);
6027 let r = h.router.clone();
6028 let (status, body) = runtime.block_on(call(
6031 r,
6032 "GET",
6033 "/memory/facts_about?subject=%20%20",
6034 None,
6035 ));
6036 assert_eq!(status, StatusCode::BAD_REQUEST);
6037 assert!(
6038 body.get("error")
6039 .and_then(|v| v.as_str())
6040 .is_some_and(|s| s.contains("subject")),
6041 "expected error mentioning subject, got {body}"
6042 );
6043 h.shutdown(&runtime);
6044 }
6045
6046 #[test]
6047 fn facts_about_endpoint_returns_empty_array_for_unknown_subject() {
6048 let runtime = rt();
6049 let h = Harness::new(&runtime);
6050 let r = h.router.clone();
6051 let (status, body) = runtime.block_on(call(
6052 r,
6053 "GET",
6054 "/memory/facts_about?subject=NobodyKnows",
6055 None,
6056 ));
6057 assert_eq!(status, StatusCode::OK);
6058 assert_eq!(body.as_array().unwrap().len(), 0);
6059 h.shutdown(&runtime);
6060 }
6061
6062 #[test]
6063 fn facts_about_endpoint_parses_include_as_object_query_param() {
6064 let runtime = rt();
6072 let h = Harness::new(&runtime);
6073 let r = h.router.clone();
6074 let (status, body) = runtime.block_on(call(
6075 r,
6076 "GET",
6077 "/memory/facts_about?subject=Maya&include_as_object=true",
6078 None,
6079 ));
6080 assert_eq!(
6081 status,
6082 StatusCode::OK,
6083 "expected 200 with include_as_object query param, got {status}"
6084 );
6085 assert!(body.is_array());
6086 h.shutdown(&runtime);
6087 }
6088
6089 #[test]
6090 fn inspect_cluster_endpoint_unknown_id_returns_404() {
6091 let runtime = rt();
6095 let h = Harness::new(&runtime);
6096 let r = h.router.clone();
6097 let (status, body) = runtime.block_on(call(
6098 r,
6099 "GET",
6100 "/memory/clusters/no-such-cluster",
6101 None,
6102 ));
6103 assert_eq!(status, StatusCode::NOT_FOUND);
6104 assert!(
6105 body.get("error")
6106 .and_then(|v| v.as_str())
6107 .is_some_and(|s| s.contains("no-such-cluster")),
6108 "expected error mentioning cluster id, got {body}"
6109 );
6110 h.shutdown(&runtime);
6111 }
6112
6113 #[test]
6114 fn inspect_cluster_endpoint_passes_full_content_query_param() {
6115 let runtime = rt();
6121 let h = Harness::new(&runtime);
6122 let r = h.router.clone();
6123 let (status, _body) = runtime.block_on(call(
6124 r,
6125 "GET",
6126 "/memory/clusters/missing?full_content=true",
6127 None,
6128 ));
6129 assert_eq!(status, StatusCode::NOT_FOUND);
6130 h.shutdown(&runtime);
6131 }
6132
6133 #[test]
6134 fn contradictions_endpoint_returns_empty_array_on_empty_db() {
6135 let runtime = rt();
6136 let h = Harness::new(&runtime);
6137 let r = h.router.clone();
6138 let (status, body) = runtime.block_on(call(
6139 r,
6140 "GET",
6141 "/memory/contradictions",
6142 None,
6143 ));
6144 assert_eq!(status, StatusCode::OK);
6145 assert!(body.is_array());
6146 assert_eq!(body.as_array().unwrap().len(), 0);
6147 h.shutdown(&runtime);
6148 }
6149
6150 #[test]
6151 fn derived_endpoints_require_bearer_when_auth_enabled() {
6152 let runtime = rt();
6153 let h = Harness::new_with_auth(&runtime, Some("secret-token".to_string()));
6154 for path in [
6161 "/memory/themes",
6162 "/memory/facts_about?subject=Sam",
6163 "/memory/contradictions",
6164 "/memory/clusters/any-id",
6165 ] {
6166 let (status, _) = runtime.block_on(call(h.router.clone(), "GET", path, None));
6167 assert_eq!(
6168 status,
6169 StatusCode::UNAUTHORIZED,
6170 "{path} should 401 without token"
6171 );
6172 }
6173 h.shutdown(&runtime);
6174 }
6175
6176 #[test]
6188 fn list_documents_endpoint_returns_empty_array_on_empty_db() {
6189 let runtime = rt();
6190 let h = Harness::new(&runtime);
6191 let r = h.router.clone();
6192 let (status, body) = runtime.block_on(call(r, "GET", "/memory/documents", None));
6193 assert_eq!(status, StatusCode::OK);
6194 assert!(body.is_array(), "expected array, got {body}");
6195 assert_eq!(body.as_array().unwrap().len(), 0);
6196 h.shutdown(&runtime);
6197 }
6198
6199 #[test]
6200 fn list_documents_endpoint_parses_query_params() {
6201 let runtime = rt();
6202 let h = Harness::new(&runtime);
6203 let r = h.router.clone();
6204 let (status, body) = runtime.block_on(call(
6205 r,
6206 "GET",
6207 "/memory/documents?limit=5&offset=0&include_forgotten=true",
6208 None,
6209 ));
6210 assert_eq!(status, StatusCode::OK);
6211 assert!(body.is_array());
6212 h.shutdown(&runtime);
6213 }
6214
6215 #[test]
6216 fn ingest_document_endpoint_rejects_empty_path() {
6217 let runtime = rt();
6218 let h = Harness::new(&runtime);
6219 let r = h.router.clone();
6220 let (status, body) = runtime.block_on(call(
6221 r,
6222 "POST",
6223 "/memory/documents",
6224 Some(json!({ "path": "" })),
6225 ));
6226 assert_eq!(status, StatusCode::BAD_REQUEST);
6227 assert!(
6228 body.get("error")
6229 .and_then(|v| v.as_str())
6230 .is_some_and(|s| s.contains("path")),
6231 "expected error mentioning path, got {body}"
6232 );
6233 h.shutdown(&runtime);
6234 }
6235
6236 #[test]
6237 fn search_docs_endpoint_rejects_empty_query() {
6238 let runtime = rt();
6239 let h = Harness::new(&runtime);
6240 let r = h.router.clone();
6241 let (status, body) = runtime.block_on(call(
6242 r,
6243 "POST",
6244 "/memory/documents/search",
6245 Some(json!({ "query": " " })),
6246 ));
6247 assert_eq!(status, StatusCode::BAD_REQUEST);
6248 assert!(
6249 body.get("error")
6250 .and_then(|v| v.as_str())
6251 .is_some_and(|s| s.contains("must not be empty")
6252 || s.contains("doc_search")),
6253 "expected error mentioning empty query, got {body}"
6254 );
6255 h.shutdown(&runtime);
6256 }
6257
6258 #[test]
6259 fn inspect_document_endpoint_unknown_id_returns_404() {
6260 let runtime = rt();
6261 let h = Harness::new(&runtime);
6262 let r = h.router.clone();
6263 let (status, body) = runtime.block_on(call(
6264 r,
6265 "GET",
6266 "/memory/documents/00000000-0000-7000-8000-000000000000",
6267 None,
6268 ));
6269 assert_eq!(status, StatusCode::NOT_FOUND);
6270 assert!(body.get("error").is_some(), "got: {body}");
6271 h.shutdown(&runtime);
6272 }
6273
6274 #[test]
6275 fn inspect_document_endpoint_rejects_malformed_id() {
6276 let runtime = rt();
6277 let h = Harness::new(&runtime);
6278 let r = h.router.clone();
6279 let (status, _body) =
6280 runtime.block_on(call(r, "GET", "/memory/documents/not-a-uuid", None));
6281 assert_eq!(status, StatusCode::BAD_REQUEST);
6282 h.shutdown(&runtime);
6283 }
6284
6285 #[test]
6286 fn forget_document_endpoint_unknown_id_returns_404() {
6287 let runtime = rt();
6290 let h = Harness::new(&runtime);
6291 let r = h.router.clone();
6292 let (status, _body) = runtime.block_on(call(
6293 r,
6294 "DELETE",
6295 "/memory/documents/00000000-0000-7000-8000-000000000000",
6296 None,
6297 ));
6298 assert_eq!(status, StatusCode::NOT_FOUND);
6299 h.shutdown(&runtime);
6300 }
6301
6302 #[test]
6303 fn forget_document_endpoint_rejects_malformed_id() {
6304 let runtime = rt();
6305 let h = Harness::new(&runtime);
6306 let r = h.router.clone();
6307 let (status, _body) =
6308 runtime.block_on(call(r, "DELETE", "/memory/documents/not-a-uuid", None));
6309 assert_eq!(status, StatusCode::BAD_REQUEST);
6310 h.shutdown(&runtime);
6311 }
6312
6313 #[test]
6314 fn document_endpoints_require_bearer_when_auth_enabled() {
6315 let runtime = rt();
6319 let h = Harness::new_with_auth(&runtime, Some("doc-secret".to_string()));
6320 let cases: &[(&str, &str, Option<Value>)] = &[
6321 ("POST", "/memory/documents", Some(json!({ "path": "/x" }))),
6322 ("GET", "/memory/documents", None),
6323 (
6324 "POST",
6325 "/memory/documents/search",
6326 Some(json!({ "query": "x" })),
6327 ),
6328 (
6329 "GET",
6330 "/memory/documents/00000000-0000-7000-8000-000000000000",
6331 None,
6332 ),
6333 (
6334 "DELETE",
6335 "/memory/documents/00000000-0000-7000-8000-000000000000",
6336 None,
6337 ),
6338 ];
6339 for (method, path, body) in cases {
6340 let (status, _) =
6341 runtime.block_on(call(h.router.clone(), method, path, body.clone()));
6342 assert_eq!(
6343 status,
6344 StatusCode::UNAUTHORIZED,
6345 "{method} {path} should 401 without token"
6346 );
6347 }
6348 h.shutdown(&runtime);
6349 }
6350
6351 #[test]
6352 fn document_endpoints_accept_correct_bearer_token() {
6353 let runtime = rt();
6359 let h = Harness::new_with_auth(&runtime, Some("doc-secret".to_string()));
6360 runtime.block_on(async {
6361 let (status, _) = call_with_auth(
6363 h.router.clone(),
6364 "GET",
6365 "/memory/documents",
6366 None,
6367 Some("Bearer doc-secret"),
6368 )
6369 .await;
6370 assert_eq!(status, StatusCode::OK);
6371
6372 let (status, _) = call_with_auth(
6374 h.router.clone(),
6375 "GET",
6376 "/memory/documents/00000000-0000-7000-8000-000000000000",
6377 None,
6378 Some("Bearer doc-secret"),
6379 )
6380 .await;
6381 assert_eq!(status, StatusCode::NOT_FOUND);
6382 });
6383 h.shutdown(&runtime);
6384 }
6385
6386 #[test]
6393 fn tenant_header_default_resolves() {
6394 let runtime = rt();
6395 let h = Harness::new(&runtime);
6396 let r = h.router.clone();
6397 let (status, _body) = runtime.block_on(async {
6398 let req = Request::builder()
6399 .method("GET")
6400 .uri("/memory/00000000-0000-7000-8000-000000000000")
6401 .header("x-solo-tenant", "default")
6402 .body(Body::empty())
6403 .unwrap();
6404 let resp = r.oneshot(req).await.expect("oneshot");
6405 let s = resp.status();
6406 let _b = resp.into_body().collect().await.unwrap().to_bytes();
6407 (s, _b)
6408 });
6409 assert_eq!(status, StatusCode::NOT_FOUND);
6413 h.shutdown(&runtime);
6414 }
6415
6416 #[test]
6418 fn tenant_header_invalid_returns_400() {
6419 let runtime = rt();
6420 let h = Harness::new(&runtime);
6421 let r = h.router.clone();
6422 let (status, body) = runtime.block_on(async {
6423 let req = Request::builder()
6424 .method("GET")
6425 .uri("/memory/00000000-0000-7000-8000-000000000000")
6426 .header("x-solo-tenant", "UPPER")
6427 .body(Body::empty())
6428 .unwrap();
6429 let resp = r.oneshot(req).await.expect("oneshot");
6430 let s = resp.status();
6431 let bytes = resp.into_body().collect().await.unwrap().to_bytes();
6432 let v: Value = serde_json::from_slice(&bytes).unwrap_or(Value::Null);
6433 (s, v)
6434 });
6435 assert_eq!(status, StatusCode::BAD_REQUEST);
6436 let msg = body.get("error").and_then(|e| e.as_str()).unwrap_or("");
6437 assert!(
6438 msg.to_lowercase().contains("tenant") || msg.to_lowercase().contains("invalid"),
6439 "error must mention tenant/invalid: {msg}"
6440 );
6441 h.shutdown(&runtime);
6442 }
6443
6444 #[test]
6446 fn tenant_header_unknown_returns_404() {
6447 let runtime = rt();
6448 let h = Harness::new(&runtime);
6449 let r = h.router.clone();
6450 let (status, _body) = runtime.block_on(async {
6451 let req = Request::builder()
6452 .method("GET")
6453 .uri("/memory/00000000-0000-7000-8000-000000000000")
6454 .header("x-solo-tenant", "never-registered")
6455 .body(Body::empty())
6456 .unwrap();
6457 let resp = r.oneshot(req).await.expect("oneshot");
6458 let s = resp.status();
6459 let _b = resp.into_body().collect().await.unwrap().to_bytes();
6460 (s, _b)
6461 });
6462 assert_eq!(status, StatusCode::NOT_FOUND);
6463 h.shutdown(&runtime);
6464 }
6465
6466 #[test]
6470 fn tenant_header_missing_defaults_to_state_default_tenant() {
6471 let runtime = rt();
6472 let h = Harness::new(&runtime);
6473 let r = h.router.clone();
6474 let (status, _body) = runtime.block_on(async {
6475 let req = Request::builder()
6476 .method("GET")
6477 .uri("/memory/00000000-0000-7000-8000-000000000000")
6478 .body(Body::empty())
6479 .unwrap();
6480 let resp = r.oneshot(req).await.expect("oneshot");
6481 let s = resp.status();
6482 let _b = resp.into_body().collect().await.unwrap().to_bytes();
6483 (s, _b)
6484 });
6485 assert_eq!(status, StatusCode::NOT_FOUND);
6486 h.shutdown(&runtime);
6487 }
6488
6489 fn seed_episode(
6503 conn: &rusqlite::Connection,
6504 memory_id: &str,
6505 ts_ms: i64,
6506 content: &str,
6507 ) -> i64 {
6508 conn.execute(
6509 "INSERT INTO episodes
6510 (memory_id, ts_ms, source_type, content,
6511 encoding_context_json, tier, status,
6512 confidence, strength, salience,
6513 created_at_ms, updated_at_ms)
6514 VALUES (?1, ?2, 'user_message', ?3,
6515 '{}', 'hot', 'active',
6516 1.0, 0.5, 0.5, ?2, ?2)",
6517 rusqlite::params![memory_id, ts_ms, content],
6518 )
6519 .expect("seed episode");
6520 conn.last_insert_rowid()
6521 }
6522
6523 fn seed_cluster_row(conn: &rusqlite::Connection, cluster_id: &str, created_at_ms: i64) {
6524 conn.execute(
6525 "INSERT INTO clusters (cluster_id, coherence, created_at_ms)
6526 VALUES (?1, 0.5, ?2)",
6527 rusqlite::params![cluster_id, created_at_ms],
6528 )
6529 .expect("seed cluster");
6530 }
6531
6532 fn seed_cluster_member(conn: &rusqlite::Connection, cluster_id: &str, memory_id: &str) {
6533 conn.execute(
6534 "INSERT INTO cluster_episodes (cluster_id, memory_id) VALUES (?1, ?2)",
6535 rusqlite::params![cluster_id, memory_id],
6536 )
6537 .expect("seed cluster_episodes");
6538 }
6539
6540 fn seed_document_row(conn: &rusqlite::Connection, doc_id: &str, title: &str) {
6541 conn.execute(
6542 "INSERT INTO documents
6543 (doc_id, source, title, mime_type, ingested_at_ms,
6544 modified_at_ms, status, chunk_count, content_hash, byte_size)
6545 VALUES (?1, ?2, ?3, 'text/plain', 0, NULL,
6546 'active', 0, ?1, NULL)",
6547 rusqlite::params![doc_id, format!("/tmp/{title}.txt"), title],
6548 )
6549 .expect("seed doc");
6550 }
6551
6552 fn seed_chunk_row(
6553 conn: &rusqlite::Connection,
6554 chunk_id: &str,
6555 doc_id: &str,
6556 chunk_index: i64,
6557 content: &str,
6558 ) {
6559 conn.execute(
6560 "INSERT INTO document_chunks
6561 (chunk_id, doc_id, chunk_index, content,
6562 token_count, start_offset, end_offset, created_at_ms)
6563 VALUES (?1, ?2, ?3, ?4, 1, 0, ?5, 0)",
6564 rusqlite::params![chunk_id, doc_id, chunk_index, content, content.len() as i64],
6565 )
6566 .expect("seed chunk");
6567 }
6568
6569 fn seed_triple_row(
6570 conn: &rusqlite::Connection,
6571 triple_id: &str,
6572 subject: &str,
6573 predicate: &str,
6574 object: &str,
6575 source_episode_rowid: Option<i64>,
6576 ) {
6577 conn.execute(
6578 "INSERT INTO triples
6579 (triple_id, subject_id, predicate, object_id, object_kind,
6580 valid_from_ms, valid_to_ms, confidence, provenance_json,
6581 status, created_at_ms, updated_at_ms, source_episode_id)
6582 VALUES (?1, ?2, ?3, ?4, 'literal', 0, NULL, 0.9, '{}',
6583 'active', 0, 0, ?5)",
6584 rusqlite::params![triple_id, subject, predicate, object, source_episode_rowid],
6585 )
6586 .expect("seed triple");
6587 }
6588
6589 fn seed_abstraction_row(
6592 conn: &rusqlite::Connection,
6593 abstraction_id: &str,
6594 cluster_id: &str,
6595 content: &str,
6596 ) {
6597 conn.execute(
6598 "INSERT INTO semantic_abstractions
6599 (abstraction_id, cluster_id, content, provenance_json,
6600 confidence, created_at_ms)
6601 VALUES (?1, ?2, ?3, '{}', 0.9, 0)",
6602 rusqlite::params![abstraction_id, cluster_id, content],
6603 )
6604 .expect("seed abstraction");
6605 }
6606
6607 fn percent_encode_node_id(node_id: &str) -> String {
6610 let mut out = String::with_capacity(node_id.len());
6611 for c in node_id.chars() {
6612 match c {
6613 ':' => out.push_str("%3A"),
6614 ' ' => out.push_str("%20"),
6615 '&' => out.push_str("%26"),
6616 '+' => out.push_str("%2B"),
6617 '?' => out.push_str("%3F"),
6618 '#' => out.push_str("%23"),
6619 _ => out.push(c),
6620 }
6621 }
6622 out
6623 }
6624
6625 fn graph_uri(node_id: &str, kind: &str) -> String {
6626 let encoded = percent_encode_node_id(node_id);
6627 format!("/v1/graph/expand?node_id={encoded}&kind={kind}")
6628 }
6629
6630 fn graph_uri_with_limit(node_id: &str, kind: &str, limit: u32) -> String {
6631 let encoded = percent_encode_node_id(node_id);
6632 format!("/v1/graph/expand?node_id={encoded}&kind={kind}&limit={limit}")
6633 }
6634
6635 #[test]
6636 fn expand_cluster_member_from_episode_returns_clusters() {
6637 let runtime = rt();
6638 let h = Harness::new(&runtime);
6639 let memory_id = "11111111-1111-7000-8000-000000000001";
6640 {
6641 let conn = h.open_db();
6642 seed_episode(&conn, memory_id, 100, "ep content");
6643 seed_cluster_row(&conn, "cl-a", 200);
6644 seed_cluster_member(&conn, "cl-a", memory_id);
6645 }
6646 let node_id = format!("ep:{memory_id}");
6647 let (status, body) = runtime.block_on(call(
6648 h.router.clone(),
6649 "GET",
6650 &graph_uri(&node_id, "cluster_member"),
6651 None,
6652 ));
6653 assert_eq!(status, StatusCode::OK, "body: {body}");
6654 let nodes = body.get("nodes").and_then(|v| v.as_array()).expect("nodes array");
6655 let edges = body.get("edges").and_then(|v| v.as_array()).expect("edges array");
6656 assert_eq!(nodes.len(), 1, "{body}");
6657 assert_eq!(nodes[0]["id"], "cl:cl-a");
6658 assert_eq!(nodes[0]["kind"], "cluster");
6659 assert_eq!(edges.len(), 1);
6660 assert_eq!(edges[0]["source"], node_id);
6661 assert_eq!(edges[0]["target"], "cl:cl-a");
6662 assert_eq!(edges[0]["kind"], "cluster_member");
6663 h.shutdown(&runtime);
6664 }
6665
6666 #[test]
6667 fn expand_cluster_member_from_cluster_returns_episodes() {
6668 let runtime = rt();
6669 let h = Harness::new(&runtime);
6670 {
6671 let conn = h.open_db();
6672 seed_cluster_row(&conn, "cl-multi", 500);
6673 for i in 0..5 {
6674 let mid = format!("2222{i}222-2222-7000-8000-000000000001");
6675 seed_episode(&conn, &mid, 100 + i as i64, &format!("content {i}"));
6676 seed_cluster_member(&conn, "cl-multi", &mid);
6677 }
6678 }
6679 let (status, body) = runtime.block_on(call(
6680 h.router.clone(),
6681 "GET",
6682 &graph_uri_with_limit("cl:cl-multi", "cluster_member", 3),
6683 None,
6684 ));
6685 assert_eq!(status, StatusCode::OK, "body: {body}");
6686 let nodes = body["nodes"].as_array().unwrap();
6687 let edges = body["edges"].as_array().unwrap();
6688 assert_eq!(nodes.len(), 3, "limit honored: {body}");
6689 assert_eq!(edges.len(), 3);
6690 for n in nodes {
6691 assert_eq!(n["kind"], "episode");
6692 }
6693 h.shutdown(&runtime);
6694 }
6695
6696 #[test]
6697 fn expand_document_chunk_from_document_returns_chunks() {
6698 let runtime = rt();
6699 let h = Harness::new(&runtime);
6700 let doc_id = "33333333-3333-7000-8000-000000000001";
6701 {
6702 let conn = h.open_db();
6703 seed_document_row(&conn, doc_id, "doc A");
6704 seed_chunk_row(&conn, "c2", doc_id, 2, "chunk 2 text");
6707 seed_chunk_row(&conn, "c0", doc_id, 0, "chunk 0 text");
6708 seed_chunk_row(&conn, "c1", doc_id, 1, "chunk 1 text");
6709 seed_chunk_row(&conn, "c3", doc_id, 3, "chunk 3 text");
6710 }
6711 let node_id = format!("doc:{doc_id}");
6712 let (status, body) = runtime.block_on(call(
6713 h.router.clone(),
6714 "GET",
6715 &graph_uri(&node_id, "document_chunk"),
6716 None,
6717 ));
6718 assert_eq!(status, StatusCode::OK, "body: {body}");
6719 let nodes = body["nodes"].as_array().unwrap();
6720 let edges = body["edges"].as_array().unwrap();
6721 assert_eq!(nodes.len(), 4);
6722 assert_eq!(edges.len(), 4);
6723 assert_eq!(nodes[0]["id"], "chunk:c0");
6725 assert_eq!(nodes[1]["id"], "chunk:c1");
6726 assert_eq!(nodes[2]["id"], "chunk:c2");
6727 assert_eq!(nodes[3]["id"], "chunk:c3");
6728 for e in edges {
6729 assert_eq!(e["kind"], "document_chunk");
6730 }
6731 h.shutdown(&runtime);
6732 }
6733
6734 #[test]
6735 fn expand_document_chunk_from_chunk_returns_parent_document() {
6736 let runtime = rt();
6737 let h = Harness::new(&runtime);
6738 let doc_id = "44444444-4444-7000-8000-000000000001";
6739 {
6740 let conn = h.open_db();
6741 seed_document_row(&conn, doc_id, "parent doc");
6742 seed_chunk_row(&conn, "c-orphan", doc_id, 0, "chunk content");
6743 }
6744 let (status, body) = runtime.block_on(call(
6745 h.router.clone(),
6746 "GET",
6747 &graph_uri("chunk:c-orphan", "document_chunk"),
6748 None,
6749 ));
6750 assert_eq!(status, StatusCode::OK, "body: {body}");
6751 let nodes = body["nodes"].as_array().unwrap();
6752 let edges = body["edges"].as_array().unwrap();
6753 assert_eq!(nodes.len(), 1);
6754 assert_eq!(edges.len(), 1);
6755 assert_eq!(nodes[0]["id"], format!("doc:{doc_id}"));
6756 assert_eq!(edges[0]["source"], "chunk:c-orphan");
6757 assert_eq!(edges[0]["target"], format!("doc:{doc_id}"));
6758 h.shutdown(&runtime);
6759 }
6760
6761 #[test]
6762 fn expand_triple_from_episode_returns_entities() {
6763 let runtime = rt();
6764 let h = Harness::new(&runtime);
6765 let memory_id = "55555555-5555-7000-8000-000000000001";
6766 let rowid;
6767 {
6768 let conn = h.open_db();
6769 rowid = seed_episode(&conn, memory_id, 100, "alice works at anthropic");
6770 seed_triple_row(&conn, "t1", "Alice", "works_at", "Anthropic", Some(rowid));
6772 seed_triple_row(&conn, "t2", "Bob", "lives_in", "NYC", Some(rowid));
6773 }
6774 let node_id = format!("ep:{memory_id}");
6775 let (status, body) = runtime.block_on(call(
6776 h.router.clone(),
6777 "GET",
6778 &graph_uri(&node_id, "triple"),
6779 None,
6780 ));
6781 assert_eq!(status, StatusCode::OK, "body: {body}");
6782 let nodes = body["nodes"].as_array().unwrap();
6783 let edges = body["edges"].as_array().unwrap();
6784 assert_eq!(nodes.len(), 4, "expected 4 unique entity nodes: {body}");
6785 assert_eq!(edges.len(), 2);
6786 let ids: std::collections::HashSet<String> = nodes
6787 .iter()
6788 .map(|n| n["id"].as_str().unwrap().to_string())
6789 .collect();
6790 for expected in ["ent:Alice", "ent:Anthropic", "ent:Bob", "ent:NYC"] {
6791 assert!(ids.contains(expected), "missing {expected} in {body}");
6792 }
6793 for e in edges {
6794 assert_eq!(e["kind"], "triple");
6795 assert!(e["predicate"].is_string(), "predicate set: {body}");
6796 }
6797 h.shutdown(&runtime);
6798 }
6799
6800 #[test]
6801 fn expand_triple_from_entity_returns_episodes() {
6802 let runtime = rt();
6803 let h = Harness::new(&runtime);
6804 {
6805 let conn = h.open_db();
6806 let r1 = seed_episode(
6807 &conn,
6808 "66666666-6666-7000-8000-000000000001",
6809 100,
6810 "alice ep one",
6811 );
6812 let r2 = seed_episode(
6813 &conn,
6814 "66666666-6666-7000-8000-000000000002",
6815 200,
6816 "alice ep two",
6817 );
6818 let r3 = seed_episode(
6819 &conn,
6820 "66666666-6666-7000-8000-000000000003",
6821 300,
6822 "alice ep three",
6823 );
6824 seed_triple_row(&conn, "t1", "Alice", "p", "Bob", Some(r1));
6826 seed_triple_row(&conn, "t2", "Carol", "p", "Alice", Some(r2));
6827 seed_triple_row(&conn, "t3", "Alice", "q", "Dave", Some(r3));
6828 seed_triple_row(&conn, "t-orphan", "Alice", "p", "Eve", None);
6830 }
6831 let (status, body) = runtime.block_on(call(
6832 h.router.clone(),
6833 "GET",
6834 &graph_uri("ent:Alice", "triple"),
6835 None,
6836 ));
6837 assert_eq!(status, StatusCode::OK, "body: {body}");
6838 let nodes = body["nodes"].as_array().unwrap();
6839 let edges = body["edges"].as_array().unwrap();
6840 assert_eq!(nodes.len(), 3, "expected 3 episodes: {body}");
6841 assert_eq!(edges.len(), 3);
6842 for n in nodes {
6843 assert_eq!(n["kind"], "episode");
6844 }
6845 for e in edges {
6846 assert_eq!(e["source"], "ent:Alice");
6847 assert_eq!(e["kind"], "triple");
6848 }
6849 h.shutdown(&runtime);
6850 }
6851
6852 #[test]
6853 fn expand_semantic_from_episode_returns_similar() {
6854 let runtime = rt();
6855 let h = Harness::new(&runtime);
6856 runtime.block_on(async {
6862 let mid1 = post_remember(h.router.clone(), "alpha alpha alpha").await;
6863 let _mid2 = post_remember(h.router.clone(), "beta beta beta").await;
6864 let _mid3 = post_remember(h.router.clone(), "gamma gamma gamma").await;
6865 let (status, body) = call(
6867 h.router.clone(),
6868 "GET",
6869 &graph_uri_with_limit(&format!("ep:{mid1}"), "semantic", 5),
6870 None,
6871 )
6872 .await;
6873 assert_eq!(status, StatusCode::OK, "body: {body}");
6874 let nodes = body["nodes"].as_array().unwrap();
6875 let edges = body["edges"].as_array().unwrap();
6876 for n in nodes {
6878 assert_ne!(
6879 n["id"].as_str().unwrap(),
6880 format!("ep:{mid1}"),
6881 "self must be excluded: {body}"
6882 );
6883 }
6884 for e in edges {
6886 assert_eq!(e["kind"], "semantic");
6887 assert!(e["weight"].is_number(), "weight set: {body}");
6888 }
6889 });
6890 h.shutdown(&runtime);
6891 }
6892
6893 async fn post_remember(router: axum::Router, content: &str) -> String {
6895 let (status, body) = call(
6896 router,
6897 "POST",
6898 "/memory",
6899 Some(json!({ "content": content })),
6900 )
6901 .await;
6902 assert_eq!(status, StatusCode::OK, "post failed: {body}");
6903 body["memory_id"].as_str().unwrap().to_string()
6904 }
6905
6906 #[test]
6907 fn expand_400_on_invalid_kind() {
6908 let runtime = rt();
6909 let h = Harness::new(&runtime);
6910 let (status, _body) = runtime.block_on(call(
6911 h.router.clone(),
6912 "GET",
6913 "/v1/graph/expand?node_id=ep:any&kind=banana",
6914 None,
6915 ));
6916 assert!(
6918 status == StatusCode::BAD_REQUEST || status == StatusCode::UNPROCESSABLE_ENTITY,
6919 "expected 400/422 for bad kind, got {status}"
6920 );
6921 h.shutdown(&runtime);
6922 }
6923
6924 #[test]
6925 fn expand_400_on_invalid_node_for_kind() {
6926 let runtime = rt();
6927 let h = Harness::new(&runtime);
6928 let (status, body) = runtime.block_on(call(
6930 h.router.clone(),
6931 "GET",
6932 &graph_uri("cl:doesnt-matter", "semantic"),
6933 None,
6934 ));
6935 assert_eq!(status, StatusCode::BAD_REQUEST);
6936 assert!(
6937 body["error"]
6938 .as_str()
6939 .is_some_and(|s| s.contains("semantic only valid for episode")),
6940 "got: {body}"
6941 );
6942 h.shutdown(&runtime);
6943 }
6944
6945 #[test]
6946 fn expand_404_on_missing_node_id() {
6947 let runtime = rt();
6948 let h = Harness::new(&runtime);
6949 let (status, body) = runtime.block_on(call(
6950 h.router.clone(),
6951 "GET",
6952 &graph_uri("ep:99999999-9999-7000-8000-000000000999", "cluster_member"),
6953 None,
6954 ));
6955 assert_eq!(status, StatusCode::NOT_FOUND, "{body}");
6956 h.shutdown(&runtime);
6957 }
6958
6959 #[test]
6960 fn expand_limit_clamped_at_100() {
6961 let runtime = rt();
6962 let h = Harness::new(&runtime);
6963 {
6965 let conn = h.open_db();
6966 seed_cluster_row(&conn, "cl-huge", 1_000);
6967 for i in 0..150 {
6968 let mid = format!("77777777-7777-7000-8000-{:012}", i);
6969 seed_episode(&conn, &mid, 100 + i as i64, &format!("content {i}"));
6970 seed_cluster_member(&conn, "cl-huge", &mid);
6971 }
6972 }
6973 let (status, body) = runtime.block_on(call(
6974 h.router.clone(),
6975 "GET",
6976 &graph_uri_with_limit("cl:cl-huge", "cluster_member", 999),
6977 None,
6978 ));
6979 assert_eq!(status, StatusCode::OK, "body: {body}");
6980 let nodes = body["nodes"].as_array().unwrap();
6981 assert_eq!(
6982 nodes.len(),
6983 100,
6984 "limit must be silently clamped to 100, got {}",
6985 nodes.len()
6986 );
6987 h.shutdown(&runtime);
6988 }
6989
6990 #[test]
6991 fn expand_bad_node_id_prefix_returns_400() {
6992 let runtime = rt();
6993 let h = Harness::new(&runtime);
6994 let (status, body) = runtime.block_on(call(
6995 h.router.clone(),
6996 "GET",
6997 "/v1/graph/expand?node_id=garbage&kind=cluster_member",
6998 None,
6999 ));
7000 assert_eq!(status, StatusCode::BAD_REQUEST);
7001 assert!(
7002 body["error"]
7003 .as_str()
7004 .is_some_and(|s| s.contains("node_id must be")),
7005 "got: {body}"
7006 );
7007 h.shutdown(&runtime);
7008 }
7009
7010 #[test]
7011 fn expand_respects_tenant_scoping_via_unknown_tenant_header() {
7012 let runtime = rt();
7017 let h = Harness::new(&runtime);
7018 let memory_id = "88888888-8888-7000-8000-000000000001";
7022 {
7023 let conn = h.open_db();
7024 seed_episode(&conn, memory_id, 100, "scoped");
7025 seed_cluster_row(&conn, "cl-scoped", 200);
7026 seed_cluster_member(&conn, "cl-scoped", memory_id);
7027 }
7028 let node_id = format!("ep:{memory_id}");
7029 let r = h.router.clone();
7030 let (status, _body) = runtime.block_on(async {
7031 let req = Request::builder()
7032 .method("GET")
7033 .uri(graph_uri(&node_id, "cluster_member"))
7034 .header("x-solo-tenant", "never-registered-tenant")
7035 .body(Body::empty())
7036 .unwrap();
7037 let resp = r.oneshot(req).await.expect("oneshot");
7038 let s = resp.status();
7039 let _b = resp.into_body().collect().await.unwrap().to_bytes();
7040 (s, _b)
7041 });
7042 assert_eq!(status, StatusCode::NOT_FOUND);
7045 h.shutdown(&runtime);
7046 }
7047
7048 #[test]
7049 fn expand_respects_auth_when_enabled() {
7050 let runtime = rt();
7051 let h = Harness::new_with_auth(&runtime, Some("graph-secret".into()));
7052 let (status, _) = runtime.block_on(call(
7054 h.router.clone(),
7055 "GET",
7056 &graph_uri("ep:any", "cluster_member"),
7057 None,
7058 ));
7059 assert_eq!(status, StatusCode::UNAUTHORIZED);
7060 let (status, _) = runtime.block_on(call_with_auth(
7062 h.router.clone(),
7063 "GET",
7064 &graph_uri("ep:99999999-9999-7000-8000-000000000999", "cluster_member"),
7065 None,
7066 Some("Bearer graph-secret"),
7067 ));
7068 assert_eq!(status, StatusCode::NOT_FOUND);
7069 h.shutdown(&runtime);
7070 }
7071
7072 #[test]
7073 fn expand_works_when_auth_none() {
7074 let runtime = rt();
7075 let h = Harness::new(&runtime);
7076 let (status, _) = runtime.block_on(call(
7079 h.router.clone(),
7080 "GET",
7081 &graph_uri("ep:99999999-9999-7000-8000-000000000999", "cluster_member"),
7082 None,
7083 ));
7084 assert_eq!(status, StatusCode::NOT_FOUND);
7085 h.shutdown(&runtime);
7086 }
7087
7088 async fn call_with_headers(
7101 router: axum::Router,
7102 method: &str,
7103 uri: &str,
7104 ) -> (StatusCode, axum::http::HeaderMap, Value) {
7105 let req = Request::builder()
7106 .method(method)
7107 .uri(uri)
7108 .header("content-length", "0")
7109 .body(Body::empty())
7110 .unwrap();
7111 let resp = router.oneshot(req).await.expect("oneshot");
7112 let status = resp.status();
7113 let headers = resp.headers().clone();
7114 let body_bytes = resp.into_body().collect().await.unwrap().to_bytes();
7115 let v: Value = if body_bytes.is_empty() {
7116 Value::Null
7117 } else {
7118 serde_json::from_slice(&body_bytes).unwrap_or(Value::Null)
7119 };
7120 (status, headers, v)
7121 }
7122
7123 #[test]
7124 fn nodes_returns_all_kinds_when_no_filter() {
7125 let runtime = rt();
7126 let h = Harness::new(&runtime);
7127 {
7128 let conn = h.open_db();
7129 let rowid = seed_episode(
7130 &conn,
7131 "aaaaaaaa-0000-7000-8000-000000000001",
7132 100,
7133 "episode one",
7134 );
7135 seed_document_row(&conn, "doc-1", "doc one");
7136 seed_chunk_row(&conn, "chunk-1", "doc-1", 0, "chunk one body");
7137 seed_cluster_row(&conn, "cl-one", 200);
7138 seed_triple_row(
7139 &conn,
7140 "t-one",
7141 "Alice",
7142 "knows",
7143 "Bob",
7144 Some(rowid),
7145 );
7146 }
7147 let (status, body) = runtime.block_on(call(
7148 h.router.clone(),
7149 "GET",
7150 "/v1/graph/nodes",
7151 None,
7152 ));
7153 assert_eq!(status, StatusCode::OK, "body: {body}");
7154 let nodes = body["nodes"].as_array().unwrap();
7155 let kinds: std::collections::HashSet<&str> = nodes
7156 .iter()
7157 .map(|n| n["kind"].as_str().unwrap())
7158 .collect();
7159 for expected in ["episode", "document", "chunk", "cluster", "entity"] {
7160 assert!(
7161 kinds.contains(expected),
7162 "expected {expected} kind in response: {body}"
7163 );
7164 }
7165 h.shutdown(&runtime);
7166 }
7167
7168 #[test]
7169 fn nodes_filter_by_single_kind() {
7170 let runtime = rt();
7171 let h = Harness::new(&runtime);
7172 {
7173 let conn = h.open_db();
7174 seed_episode(&conn, "bbbbbbbb-0000-7000-8000-000000000001", 100, "ep");
7175 seed_document_row(&conn, "doc-only", "d");
7176 seed_cluster_row(&conn, "cl-only", 300);
7177 }
7178 let (status, body) = runtime.block_on(call(
7179 h.router.clone(),
7180 "GET",
7181 "/v1/graph/nodes?kind=episode",
7182 None,
7183 ));
7184 assert_eq!(status, StatusCode::OK, "body: {body}");
7185 let nodes = body["nodes"].as_array().unwrap();
7186 assert!(!nodes.is_empty(), "{body}");
7187 for n in nodes {
7188 assert_eq!(n["kind"], "episode", "kind filter must be exclusive: {body}");
7189 }
7190 h.shutdown(&runtime);
7191 }
7192
7193 #[test]
7194 fn nodes_filter_by_multiple_kinds() {
7195 let runtime = rt();
7196 let h = Harness::new(&runtime);
7197 {
7198 let conn = h.open_db();
7199 seed_episode(&conn, "cccccccc-0000-7000-8000-000000000001", 100, "ep");
7200 seed_document_row(&conn, "doc-multi", "d");
7201 seed_cluster_row(&conn, "cl-multi", 300);
7202 }
7203 let (status, body) = runtime.block_on(call(
7204 h.router.clone(),
7205 "GET",
7206 "/v1/graph/nodes?kind=episode,document",
7207 None,
7208 ));
7209 assert_eq!(status, StatusCode::OK, "body: {body}");
7210 let nodes = body["nodes"].as_array().unwrap();
7211 let kinds: std::collections::HashSet<&str> = nodes
7212 .iter()
7213 .map(|n| n["kind"].as_str().unwrap())
7214 .collect();
7215 assert!(kinds.contains("episode"), "{body}");
7216 assert!(kinds.contains("document"), "{body}");
7217 assert!(
7218 !kinds.contains("cluster"),
7219 "cluster must be filtered out: {body}"
7220 );
7221 h.shutdown(&runtime);
7222 }
7223
7224 #[test]
7225 fn nodes_entity_synthesis_caps_at_200() {
7226 let runtime = rt();
7227 let h = Harness::new(&runtime);
7228 {
7229 let conn = h.open_db();
7230 let rowid = seed_episode(
7235 &conn,
7236 "dddddddd-0000-7000-8000-000000000001",
7237 100,
7238 "ep",
7239 );
7240 for i in 0..250 {
7241 let triple_id = format!("t-cap-{i:03}");
7242 let obj = format!("Entity{i:03}");
7243 seed_triple_row(&conn, &triple_id, "Alice", "knows", &obj, Some(rowid));
7244 }
7245 }
7246 let (status, headers, body) = runtime.block_on(call_with_headers(
7247 h.router.clone(),
7248 "GET",
7249 "/v1/graph/nodes?kind=entity&limit=500",
7250 ));
7251 assert_eq!(status, StatusCode::OK, "body: {body}");
7252 let nodes = body["nodes"].as_array().unwrap();
7253 assert_eq!(
7254 nodes.len(),
7255 200,
7256 "entity cap must be enforced at 200, got {}",
7257 nodes.len()
7258 );
7259 assert_eq!(
7260 headers
7261 .get("x-solo-entity-cap-reached")
7262 .and_then(|v| v.to_str().ok()),
7263 Some("true"),
7264 "cap-reached header missing: headers={headers:?}"
7265 );
7266 for n in nodes {
7267 assert_eq!(n["kind"], "entity");
7268 }
7269 h.shutdown(&runtime);
7270 }
7271
7272 #[test]
7273 fn nodes_since_until_filter_works() {
7274 let runtime = rt();
7275 let h = Harness::new(&runtime);
7276 {
7277 let conn = h.open_db();
7278 seed_episode(
7279 &conn,
7280 "eeeeeeee-0000-7000-8000-000000000001",
7281 100,
7282 "early",
7283 );
7284 seed_episode(
7285 &conn,
7286 "eeeeeeee-0000-7000-8000-000000000002",
7287 500,
7288 "middle",
7289 );
7290 seed_episode(
7291 &conn,
7292 "eeeeeeee-0000-7000-8000-000000000003",
7293 1000,
7294 "late",
7295 );
7296 }
7297 let (status, body) = runtime.block_on(call(
7298 h.router.clone(),
7299 "GET",
7300 "/v1/graph/nodes?kind=episode&since_ms=400&until_ms=600",
7301 None,
7302 ));
7303 assert_eq!(status, StatusCode::OK, "body: {body}");
7304 let nodes = body["nodes"].as_array().unwrap();
7305 assert_eq!(nodes.len(), 1, "{body}");
7306 assert_eq!(
7307 nodes[0]["id"],
7308 "ep:eeeeeeee-0000-7000-8000-000000000002"
7309 );
7310 h.shutdown(&runtime);
7311 }
7312
7313 #[test]
7314 fn nodes_pagination_round_trip() {
7315 let runtime = rt();
7316 let h = Harness::new(&runtime);
7317 {
7318 let conn = h.open_db();
7319 for i in 0..150 {
7320 let mid = format!("f0000000-0000-7000-8000-{i:012}");
7321 seed_episode(&conn, &mid, 1_000 + i as i64, "page");
7324 }
7325 }
7326 let limit = 50u32;
7327 let mut seen: std::collections::HashSet<String> = Default::default();
7328 let mut next_cursor: Option<String> = None;
7329 for page_idx in 0..4 {
7330 let cursor_param = next_cursor
7331 .as_deref()
7332 .map(|c| format!("&cursor={c}"))
7333 .unwrap_or_default();
7334 let uri = format!(
7335 "/v1/graph/nodes?kind=episode&limit={limit}{cursor_param}"
7336 );
7337 let (status, body) =
7338 runtime.block_on(call(h.router.clone(), "GET", &uri, None));
7339 assert_eq!(status, StatusCode::OK, "page {page_idx}: {body}");
7340 let nodes = body["nodes"].as_array().unwrap();
7341 assert!(
7342 nodes.len() <= limit as usize,
7343 "page {page_idx} over-fetched: {body}"
7344 );
7345 for n in nodes {
7346 let id = n["id"].as_str().unwrap().to_string();
7347 assert!(seen.insert(id.clone()), "duplicate id across pages: {id}");
7348 }
7349 next_cursor = body
7350 .get("next_cursor")
7351 .and_then(|v| v.as_str())
7352 .map(|s| s.to_string());
7353 if next_cursor.is_none() {
7354 break;
7355 }
7356 }
7357 assert_eq!(
7358 seen.len(),
7359 150,
7360 "expected 150 distinct ids across pages, got {}",
7361 seen.len()
7362 );
7363 assert!(
7364 next_cursor.is_none(),
7365 "cursor should be null after last page; got {next_cursor:?}"
7366 );
7367 h.shutdown(&runtime);
7368 }
7369
7370 #[test]
7371 fn nodes_respects_tenant_scoping() {
7372 let runtime = rt();
7373 let h = Harness::new(&runtime);
7374 {
7375 let conn = h.open_db();
7376 seed_episode(
7377 &conn,
7378 "11110000-0000-7000-8000-000000000001",
7379 100,
7380 "tenant scope",
7381 );
7382 }
7383 let r = h.router.clone();
7386 let (status, _body) = runtime.block_on(async {
7387 let req = Request::builder()
7388 .method("GET")
7389 .uri("/v1/graph/nodes")
7390 .header("x-solo-tenant", "never-registered-tenant")
7391 .body(Body::empty())
7392 .unwrap();
7393 let resp = r.oneshot(req).await.expect("oneshot");
7394 let s = resp.status();
7395 let _b = resp.into_body().collect().await.unwrap().to_bytes();
7396 (s, _b)
7397 });
7398 assert_eq!(status, StatusCode::NOT_FOUND);
7399 h.shutdown(&runtime);
7400 }
7401
7402 #[test]
7403 fn nodes_respects_auth_when_enabled() {
7404 let runtime = rt();
7405 let h = Harness::new_with_auth(&runtime, Some("nodes-secret".into()));
7406 let (status, _) = runtime.block_on(call(
7407 h.router.clone(),
7408 "GET",
7409 "/v1/graph/nodes",
7410 None,
7411 ));
7412 assert_eq!(
7413 status,
7414 StatusCode::UNAUTHORIZED,
7415 "must reject unauthenticated request"
7416 );
7417 let (status, _) = runtime.block_on(call_with_auth(
7418 h.router.clone(),
7419 "GET",
7420 "/v1/graph/nodes",
7421 None,
7422 Some("Bearer nodes-secret"),
7423 ));
7424 assert_eq!(status, StatusCode::OK, "must pass through with bearer");
7425 h.shutdown(&runtime);
7426 }
7427
7428 #[test]
7429 fn nodes_works_with_auth_none() {
7430 let runtime = rt();
7431 let h = Harness::new(&runtime);
7432 let (status, body) = runtime.block_on(call(
7433 h.router.clone(),
7434 "GET",
7435 "/v1/graph/nodes",
7436 None,
7437 ));
7438 assert_eq!(status, StatusCode::OK, "{body}");
7439 assert!(body.get("nodes").is_some());
7440 h.shutdown(&runtime);
7441 }
7442
7443 #[test]
7446 fn edges_returns_all_default_kinds() {
7447 let runtime = rt();
7448 let h = Harness::new(&runtime);
7449 {
7450 let conn = h.open_db();
7451 let rowid = seed_episode(
7452 &conn,
7453 "22220000-0000-7000-8000-000000000001",
7454 100,
7455 "ep src",
7456 );
7457 seed_triple_row(&conn, "t-def", "Alice", "knows", "Bob", Some(rowid));
7458 seed_document_row(&conn, "doc-e", "doc");
7459 seed_chunk_row(&conn, "c-e", "doc-e", 0, "chunk");
7460 seed_cluster_row(&conn, "cl-e", 200);
7461 seed_cluster_member(
7462 &conn,
7463 "cl-e",
7464 "22220000-0000-7000-8000-000000000001",
7465 );
7466 }
7467 let (status, body) = runtime.block_on(call(
7468 h.router.clone(),
7469 "GET",
7470 "/v1/graph/edges",
7471 None,
7472 ));
7473 assert_eq!(status, StatusCode::OK, "body: {body}");
7474 let edges = body["edges"].as_array().unwrap();
7475 let kinds: std::collections::HashSet<&str> = edges
7476 .iter()
7477 .map(|e| e["kind"].as_str().unwrap())
7478 .collect();
7479 assert!(kinds.contains("triple"), "{body}");
7480 assert!(kinds.contains("document_chunk"), "{body}");
7481 assert!(kinds.contains("cluster_member"), "{body}");
7482 assert!(
7483 !kinds.contains("semantic"),
7484 "semantic is NOT in default response: {body}"
7485 );
7486 h.shutdown(&runtime);
7487 }
7488
7489 #[test]
7490 fn edges_filter_by_node_id_finds_incident_edges() {
7491 let runtime = rt();
7492 let h = Harness::new(&runtime);
7493 let memory_id = "33330000-0000-7000-8000-000000000001";
7494 {
7495 let conn = h.open_db();
7496 let rowid = seed_episode(&conn, memory_id, 100, "ep multi-triple");
7497 seed_triple_row(&conn, "t-a", "Alice", "p", "Bob", Some(rowid));
7498 seed_triple_row(&conn, "t-b", "Alice", "p", "Carol", Some(rowid));
7499 seed_triple_row(&conn, "t-c", "Alice", "p", "Dave", Some(rowid));
7500 let decoy_rowid = seed_episode(
7502 &conn,
7503 "33330000-0000-7000-8000-000000000999",
7504 200,
7505 "decoy",
7506 );
7507 seed_triple_row(
7508 &conn,
7509 "t-decoy",
7510 "Alice",
7511 "p",
7512 "Eve",
7513 Some(decoy_rowid),
7514 );
7515 }
7516 let uri = format!(
7517 "/v1/graph/edges?type=triple&node_id={}",
7518 percent_encode_node_id(&format!("ep:{memory_id}"))
7519 );
7520 let (status, body) =
7521 runtime.block_on(call(h.router.clone(), "GET", &uri, None));
7522 assert_eq!(status, StatusCode::OK, "body: {body}");
7523 let edges = body["edges"].as_array().unwrap();
7524 assert_eq!(edges.len(), 3, "expected 3 incident edges: {body}");
7525 for e in edges {
7526 assert_eq!(e["source"], format!("ep:{memory_id}"));
7527 assert_eq!(e["kind"], "triple");
7528 }
7529 h.shutdown(&runtime);
7530 }
7531
7532 #[test]
7533 fn edges_filter_by_type_works() {
7534 let runtime = rt();
7535 let h = Harness::new(&runtime);
7536 {
7537 let conn = h.open_db();
7538 let rowid = seed_episode(
7539 &conn,
7540 "44440000-0000-7000-8000-000000000001",
7541 100,
7542 "ep",
7543 );
7544 seed_triple_row(&conn, "t-only", "Alice", "p", "Bob", Some(rowid));
7545 seed_document_row(&conn, "doc-skip", "doc");
7546 seed_chunk_row(&conn, "c-skip", "doc-skip", 0, "chunk");
7547 }
7548 let (status, body) = runtime.block_on(call(
7549 h.router.clone(),
7550 "GET",
7551 "/v1/graph/edges?type=triple",
7552 None,
7553 ));
7554 assert_eq!(status, StatusCode::OK, "{body}");
7555 let edges = body["edges"].as_array().unwrap();
7556 assert!(!edges.is_empty(), "{body}");
7557 for e in edges {
7558 assert_eq!(e["kind"], "triple", "{body}");
7559 }
7560 h.shutdown(&runtime);
7561 }
7562
7563 #[test]
7564 fn edges_rejects_semantic_type_with_400() {
7565 let runtime = rt();
7566 let h = Harness::new(&runtime);
7567 let (status, body) = runtime.block_on(call(
7568 h.router.clone(),
7569 "GET",
7570 "/v1/graph/edges?type=semantic",
7571 None,
7572 ));
7573 assert_eq!(status, StatusCode::BAD_REQUEST, "body: {body}");
7574 let err = body["error"].as_str().unwrap_or_default();
7575 assert!(
7576 err.contains("/v1/graph/neighbors"),
7577 "error must point to /v1/graph/neighbors: {body}"
7578 );
7579 h.shutdown(&runtime);
7580 }
7581
7582 #[test]
7583 fn edges_pagination_round_trip() {
7584 let runtime = rt();
7585 let h = Harness::new(&runtime);
7586 {
7587 let conn = h.open_db();
7588 let rowid = seed_episode(
7589 &conn,
7590 "55550000-0000-7000-8000-000000000001",
7591 100,
7592 "ep big",
7593 );
7594 for i in 0..60 {
7596 let tid = format!("t-page-{i:03}");
7597 let obj = format!("Obj{i:03}");
7598 seed_triple_row(&conn, &tid, "Alice", "p", &obj, Some(rowid));
7599 }
7600 }
7601 let limit = 25u32;
7602 let mut seen: std::collections::HashSet<String> = Default::default();
7603 let mut next_cursor: Option<String> = None;
7604 for page_idx in 0..5 {
7605 let cursor_param = next_cursor
7606 .as_deref()
7607 .map(|c| format!("&cursor={c}"))
7608 .unwrap_or_default();
7609 let uri = format!(
7610 "/v1/graph/edges?type=triple&limit={limit}{cursor_param}"
7611 );
7612 let (status, body) =
7613 runtime.block_on(call(h.router.clone(), "GET", &uri, None));
7614 assert_eq!(status, StatusCode::OK, "page {page_idx}: {body}");
7615 let edges = body["edges"].as_array().unwrap();
7616 for e in edges {
7617 let id = e["id"].as_str().unwrap().to_string();
7618 assert!(seen.insert(id.clone()), "duplicate edge id: {id}");
7619 }
7620 next_cursor = body
7621 .get("next_cursor")
7622 .and_then(|v| v.as_str())
7623 .map(|s| s.to_string());
7624 if next_cursor.is_none() {
7625 break;
7626 }
7627 }
7628 assert_eq!(
7629 seen.len(),
7630 60,
7631 "expected 60 distinct edges, got {}",
7632 seen.len()
7633 );
7634 assert!(next_cursor.is_none(), "expected exhausted cursor");
7635 h.shutdown(&runtime);
7636 }
7637
7638 #[test]
7639 fn edges_respects_tenant_scoping() {
7640 let runtime = rt();
7641 let h = Harness::new(&runtime);
7642 {
7643 let conn = h.open_db();
7644 let rowid = seed_episode(
7645 &conn,
7646 "66660000-0000-7000-8000-000000000001",
7647 100,
7648 "ep",
7649 );
7650 seed_triple_row(&conn, "t-tenant", "Alice", "p", "Bob", Some(rowid));
7651 }
7652 let r = h.router.clone();
7653 let (status, _) = runtime.block_on(async {
7654 let req = Request::builder()
7655 .method("GET")
7656 .uri("/v1/graph/edges")
7657 .header("x-solo-tenant", "never-registered-tenant")
7658 .body(Body::empty())
7659 .unwrap();
7660 let resp = r.oneshot(req).await.expect("oneshot");
7661 let s = resp.status();
7662 let _b = resp.into_body().collect().await.unwrap().to_bytes();
7663 (s, _b)
7664 });
7665 assert_eq!(status, StatusCode::NOT_FOUND);
7666 h.shutdown(&runtime);
7667 }
7668
7669 #[test]
7670 fn edges_respects_auth_when_enabled() {
7671 let runtime = rt();
7672 let h = Harness::new_with_auth(&runtime, Some("edges-secret".into()));
7673 let (status, _) = runtime.block_on(call(
7674 h.router.clone(),
7675 "GET",
7676 "/v1/graph/edges",
7677 None,
7678 ));
7679 assert_eq!(status, StatusCode::UNAUTHORIZED);
7680 let (status, _) = runtime.block_on(call_with_auth(
7681 h.router.clone(),
7682 "GET",
7683 "/v1/graph/edges",
7684 None,
7685 Some("Bearer edges-secret"),
7686 ));
7687 assert_eq!(status, StatusCode::OK);
7688 h.shutdown(&runtime);
7689 }
7690
7691 fn inspect_uri(node_id: &str) -> String {
7702 format!("/v1/graph/inspect/{}", percent_encode_node_id(node_id))
7706 }
7707
7708 #[test]
7709 fn inspect_episode_returns_full_text_plus_triples_out() {
7710 let runtime = rt();
7711 let h = Harness::new(&runtime);
7712 let memory_id = "a1110000-0000-7000-8000-000000000001";
7713 let full_text = "Met Alice for coffee at the new place. She mentioned the project is on track but they're hitting issues with the deploy pipeline.";
7714 {
7715 let conn = h.open_db();
7716 let rowid = seed_episode(&conn, memory_id, 1_715_625_600_000, full_text);
7717 seed_triple_row(&conn, "t-ep-1", "user", "met_with", "Alice", Some(rowid));
7718 seed_triple_row(&conn, "t-ep-2", "user", "discussed", "deploy_pipeline", Some(rowid));
7719 seed_triple_row(&conn, "t-ep-3", "Alice", "works_on", "project", Some(rowid));
7720 }
7721 let (status, body) = runtime.block_on(call(
7722 h.router.clone(),
7723 "GET",
7724 &inspect_uri(&format!("ep:{memory_id}")),
7725 None,
7726 ));
7727 assert_eq!(status, StatusCode::OK, "body: {body}");
7728 assert_eq!(body["node"]["kind"], "episode");
7729 assert_eq!(body["node"]["id"], format!("ep:{memory_id}"));
7730 assert_eq!(
7731 body["full_text"].as_str().unwrap(),
7732 full_text,
7733 "full_text must match episodes.content verbatim, untruncated"
7734 );
7735 let triples_out = body["triples_out"].as_array().unwrap();
7736 assert_eq!(triples_out.len(), 3, "{body}");
7737 let triples_in = body["triples_in"].as_array().unwrap();
7738 assert!(triples_in.is_empty(), "episodes have no triples_in: {body}");
7739 for e in triples_out {
7740 assert_eq!(e["kind"], "triple");
7741 assert_eq!(e["source"], format!("ep:{memory_id}"));
7742 assert!(e["target"].as_str().unwrap().starts_with("ent:"));
7743 assert!(e["predicate"].as_str().is_some());
7744 assert!(e["weight"].as_f64().is_some());
7745 }
7746 h.shutdown(&runtime);
7747 }
7748
7749 #[test]
7750 fn inspect_episode_triples_in_is_empty_for_v10p1() {
7751 let runtime = rt();
7756 let h = Harness::new(&runtime);
7757 let focal = "a2220000-0000-7000-8000-000000000001";
7758 let other = "a2220000-0000-7000-8000-000000000002";
7759 {
7760 let conn = h.open_db();
7761 seed_episode(&conn, focal, 100, "focal episode body");
7762 let other_rowid = seed_episode(&conn, other, 200, "another episode");
7763 for i in 0..5 {
7766 let tid = format!("t-other-{i}");
7767 seed_triple_row(&conn, &tid, "user", "did", "thing", Some(other_rowid));
7768 }
7769 }
7770 let (status, body) = runtime.block_on(call(
7771 h.router.clone(),
7772 "GET",
7773 &inspect_uri(&format!("ep:{focal}")),
7774 None,
7775 ));
7776 assert_eq!(status, StatusCode::OK, "body: {body}");
7777 let triples_in = body["triples_in"].as_array().unwrap();
7778 assert!(
7779 triples_in.is_empty(),
7780 "episode triples_in must be empty regardless of cross-episode entity references: {body}"
7781 );
7782 h.shutdown(&runtime);
7783 }
7784
7785 #[test]
7786 fn inspect_document_returns_full_text_concatenated_from_chunks() {
7787 let runtime = rt();
7788 let h = Harness::new(&runtime);
7789 let doc_id = "d3330000-0000-7000-8000-000000000001";
7790 {
7791 let conn = h.open_db();
7792 seed_document_row(&conn, doc_id, "doc-title");
7793 seed_chunk_row(&conn, "ch-doc-1", doc_id, 0, "First chunk body.");
7794 seed_chunk_row(&conn, "ch-doc-2", doc_id, 1, "Second chunk body.");
7795 seed_chunk_row(&conn, "ch-doc-3", doc_id, 2, "Third chunk body.");
7796 }
7797 let (status, body) = runtime.block_on(call(
7798 h.router.clone(),
7799 "GET",
7800 &inspect_uri(&format!("doc:{doc_id}")),
7801 None,
7802 ));
7803 assert_eq!(status, StatusCode::OK, "body: {body}");
7804 assert_eq!(body["node"]["kind"], "document");
7805 let full_text = body["full_text"].as_str().unwrap();
7806 assert_eq!(
7808 full_text,
7809 "First chunk body.\n\nSecond chunk body.\n\nThird chunk body."
7810 );
7811 assert!(body["triples_in"].as_array().unwrap().is_empty());
7812 assert!(body["triples_out"].as_array().unwrap().is_empty());
7813 h.shutdown(&runtime);
7814 }
7815
7816 #[test]
7817 fn inspect_chunk_returns_text() {
7818 let runtime = rt();
7819 let h = Harness::new(&runtime);
7820 let chunk_body = "This is the body of the chunk being inspected.";
7821 {
7822 let conn = h.open_db();
7823 seed_document_row(&conn, "doc-chunk-host", "host");
7824 seed_chunk_row(&conn, "chunk-inspect-target", "doc-chunk-host", 0, chunk_body);
7825 }
7826 let (status, body) = runtime.block_on(call(
7827 h.router.clone(),
7828 "GET",
7829 &inspect_uri("chunk:chunk-inspect-target"),
7830 None,
7831 ));
7832 assert_eq!(status, StatusCode::OK, "body: {body}");
7833 assert_eq!(body["node"]["kind"], "chunk");
7834 assert_eq!(body["full_text"].as_str().unwrap(), chunk_body);
7835 assert!(body["triples_in"].as_array().unwrap().is_empty());
7836 assert!(body["triples_out"].as_array().unwrap().is_empty());
7837 h.shutdown(&runtime);
7838 }
7839
7840 #[test]
7841 fn inspect_cluster_returns_label_and_abstraction() {
7842 let runtime = rt();
7843 let h = Harness::new(&runtime);
7844 let cluster_id = "cl-inspect-target";
7845 let abstraction_text = "Discussions about the deploy pipeline and on-call rotation.";
7846 {
7847 let conn = h.open_db();
7848 seed_cluster_row(&conn, cluster_id, 12345);
7849 seed_abstraction_row(&conn, "abs-1", cluster_id, abstraction_text);
7850 }
7851 let (status, body) = runtime.block_on(call(
7852 h.router.clone(),
7853 "GET",
7854 &inspect_uri(&format!("cl:{cluster_id}")),
7855 None,
7856 ));
7857 assert_eq!(status, StatusCode::OK, "body: {body}");
7858 assert_eq!(body["node"]["kind"], "cluster");
7859 let full_text = body["full_text"].as_str().unwrap();
7860 assert!(
7861 full_text.contains(cluster_id),
7862 "full_text must include cluster label: {full_text}"
7863 );
7864 assert!(
7865 full_text.contains(abstraction_text),
7866 "full_text must include abstraction text: {full_text}"
7867 );
7868 assert!(full_text.contains("\n\n"), "label and abstraction must be separated: {full_text}");
7871 h.shutdown(&runtime);
7872 }
7873
7874 #[test]
7875 fn inspect_entity_returns_triples_only() {
7876 let runtime = rt();
7877 let h = Harness::new(&runtime);
7878 {
7879 let conn = h.open_db();
7880 let rowid = seed_episode(
7881 &conn,
7882 "e5550000-0000-7000-8000-000000000001",
7883 100,
7884 "host episode",
7885 );
7886 seed_triple_row(&conn, "t-ent-1", "Alice", "knows", "Bob", Some(rowid));
7888 seed_triple_row(&conn, "t-ent-2", "Alice", "works_at", "Anthropic", Some(rowid));
7889 seed_triple_row(&conn, "t-ent-3", "user", "met", "Alice", Some(rowid));
7890 seed_triple_row(&conn, "t-ent-4", "Alice", "owns", "laptop", Some(rowid));
7891 seed_triple_row(&conn, "t-ent-5", "Carol", "mentors", "Alice", Some(rowid));
7892 }
7893 let (status, body) = runtime.block_on(call(
7894 h.router.clone(),
7895 "GET",
7896 &inspect_uri("ent:Alice"),
7897 None,
7898 ));
7899 assert_eq!(status, StatusCode::OK, "body: {body}");
7900 assert_eq!(body["node"]["kind"], "entity");
7901 assert_eq!(body["node"]["id"], "ent:Alice");
7902 assert!(
7903 body["full_text"].is_null(),
7904 "entity full_text must be null (entities have no body): {body}"
7905 );
7906 let triples_out = body["triples_out"].as_array().unwrap();
7907 assert_eq!(triples_out.len(), 5, "{body}");
7908 assert!(body["triples_in"].as_array().unwrap().is_empty());
7909 for e in triples_out {
7910 assert_eq!(e["kind"], "triple");
7911 assert_eq!(e["source"], "ent:Alice");
7912 assert!(e["target"].as_str().unwrap().starts_with("ent:"));
7915 assert_ne!(e["target"], "ent:Alice");
7916 }
7917 h.shutdown(&runtime);
7918 }
7919
7920 #[test]
7921 fn inspect_entity_with_zero_triples_returns_404() {
7922 let runtime = rt();
7923 let h = Harness::new(&runtime);
7924 {
7927 let conn = h.open_db();
7928 let rowid = seed_episode(
7929 &conn,
7930 "e6660000-0000-7000-8000-000000000001",
7931 100,
7932 "ep",
7933 );
7934 seed_triple_row(&conn, "t-other", "Bob", "knows", "Carol", Some(rowid));
7935 }
7936 let (status, body) = runtime.block_on(call(
7937 h.router.clone(),
7938 "GET",
7939 &inspect_uri("ent:Nonexistent"),
7940 None,
7941 ));
7942 assert_eq!(status, StatusCode::NOT_FOUND, "body: {body}");
7943 let err = body["error"].as_str().unwrap_or_default();
7944 assert!(
7945 err.contains("Nonexistent") || err.contains("entity"),
7946 "error must mention entity: {body}"
7947 );
7948 h.shutdown(&runtime);
7949 }
7950
7951 #[test]
7952 fn inspect_404_on_missing_node() {
7953 let runtime = rt();
7955 let h = Harness::new(&runtime);
7956 let (status, body) = runtime.block_on(call(
7957 h.router.clone(),
7958 "GET",
7959 &inspect_uri("ep:99999999-9999-7000-8000-000000000999"),
7960 None,
7961 ));
7962 assert_eq!(status, StatusCode::NOT_FOUND, "body: {body}");
7963 h.shutdown(&runtime);
7964 }
7965
7966 #[test]
7967 fn inspect_400_on_invalid_prefix() {
7968 let runtime = rt();
7969 let h = Harness::new(&runtime);
7970 let (status, body) = runtime.block_on(call(
7971 h.router.clone(),
7972 "GET",
7973 &inspect_uri("xyz:foo"),
7974 None,
7975 ));
7976 assert_eq!(status, StatusCode::BAD_REQUEST, "body: {body}");
7977 let err = body["error"].as_str().unwrap_or_default();
7978 assert!(
7979 err.contains("xyz") || err.contains("prefix"),
7980 "error must mention bad prefix: {body}"
7981 );
7982 h.shutdown(&runtime);
7983 }
7984
7985 #[test]
7986 fn inspect_respects_tenant_scoping() {
7987 let runtime = rt();
7988 let h = Harness::new(&runtime);
7989 let memory_id = "a7770000-0000-7000-8000-000000000001";
7990 {
7991 let conn = h.open_db();
7992 seed_episode(&conn, memory_id, 100, "tenant scope");
7993 }
7994 let r = h.router.clone();
7998 let (status, _) = runtime.block_on(async {
7999 let req = Request::builder()
8000 .method("GET")
8001 .uri(inspect_uri(&format!("ep:{memory_id}")))
8002 .header("x-solo-tenant", "never-registered-tenant")
8003 .body(Body::empty())
8004 .unwrap();
8005 let resp = r.oneshot(req).await.expect("oneshot");
8006 let s = resp.status();
8007 let _b = resp.into_body().collect().await.unwrap().to_bytes();
8008 (s, _b)
8009 });
8010 assert_eq!(status, StatusCode::NOT_FOUND);
8011 let (status, body) = runtime.block_on(call(
8013 h.router.clone(),
8014 "GET",
8015 &inspect_uri(&format!("ep:{memory_id}")),
8016 None,
8017 ));
8018 assert_eq!(status, StatusCode::OK, "default tenant must resolve: {body}");
8019 h.shutdown(&runtime);
8020 }
8021
8022 #[test]
8023 fn inspect_respects_auth_when_enabled() {
8024 let runtime = rt();
8025 let h = Harness::new_with_auth(&runtime, Some("inspect-secret".into()));
8026 let (status, _) = runtime.block_on(call(
8028 h.router.clone(),
8029 "GET",
8030 &inspect_uri("ep:99999999-9999-7000-8000-000000000999"),
8031 None,
8032 ));
8033 assert_eq!(status, StatusCode::UNAUTHORIZED);
8034 let (status, _) = runtime.block_on(call_with_auth(
8037 h.router.clone(),
8038 "GET",
8039 &inspect_uri("ep:99999999-9999-7000-8000-000000000999"),
8040 None,
8041 Some("Bearer inspect-secret"),
8042 ));
8043 assert_eq!(status, StatusCode::NOT_FOUND);
8044 h.shutdown(&runtime);
8045 }
8046
8047 fn neighbors_uri(
8061 node_id: &str,
8062 kind: Option<&str>,
8063 threshold: Option<f32>,
8064 limit: Option<u32>,
8065 ) -> String {
8066 let mut qs: Vec<String> = Vec::new();
8067 if let Some(k) = kind {
8068 qs.push(format!("kind={k}"));
8069 }
8070 if let Some(t) = threshold {
8071 qs.push(format!("threshold={t}"));
8072 }
8073 if let Some(l) = limit {
8074 qs.push(format!("limit={l}"));
8075 }
8076 let encoded = percent_encode_node_id(node_id);
8077 if qs.is_empty() {
8078 format!("/v1/graph/neighbors/{encoded}")
8079 } else {
8080 format!("/v1/graph/neighbors/{encoded}?{}", qs.join("&"))
8081 }
8082 }
8083
8084 #[test]
8089 fn neighbors_explicit_only_returns_no_semantic_edges() {
8090 let runtime = rt();
8091 let h = Harness::new(&runtime);
8092 runtime.block_on(async {
8093 let focal = post_remember(h.router.clone(), "alpha alpha alpha").await;
8097 let _other1 = post_remember(h.router.clone(), "beta beta beta").await;
8098 let _other2 = post_remember(h.router.clone(), "gamma gamma gamma").await;
8099 {
8102 let conn = h.open_db();
8103 let rowid: i64 = conn
8104 .query_row(
8105 "SELECT rowid FROM episodes WHERE memory_id = ?1",
8106 rusqlite::params![&focal],
8107 |r| r.get(0),
8108 )
8109 .unwrap();
8110 seed_triple_row(&conn, "t-exp-1", "Alice", "knows", "Bob", Some(rowid));
8111 seed_triple_row(&conn, "t-exp-2", "Alice", "owns", "laptop", Some(rowid));
8112 }
8113 let (status, body) = call(
8114 h.router.clone(),
8115 "GET",
8116 &neighbors_uri(&format!("ep:{focal}"), Some("explicit"), None, None),
8117 None,
8118 )
8119 .await;
8120 assert_eq!(status, StatusCode::OK, "body: {body}");
8121 let edges = body["edges"].as_array().unwrap();
8122 assert!(!edges.is_empty(), "expected explicit edges: {body}");
8123 for e in edges {
8124 assert_ne!(
8125 e["kind"], "semantic",
8126 "kind=explicit must drop semantic edges: {body}"
8127 );
8128 }
8129 });
8130 h.shutdown(&runtime);
8131 }
8132
8133 #[test]
8136 fn neighbors_semantic_only_returns_no_explicit_edges() {
8137 let runtime = rt();
8138 let h = Harness::new(&runtime);
8139 runtime.block_on(async {
8140 let focal = post_remember(h.router.clone(), "alpha alpha alpha").await;
8141 let _other1 = post_remember(h.router.clone(), "beta beta beta").await;
8142 let _other2 = post_remember(h.router.clone(), "gamma gamma gamma").await;
8143 {
8144 let conn = h.open_db();
8145 let rowid: i64 = conn
8146 .query_row(
8147 "SELECT rowid FROM episodes WHERE memory_id = ?1",
8148 rusqlite::params![&focal],
8149 |r| r.get(0),
8150 )
8151 .unwrap();
8152 seed_triple_row(&conn, "t-exp-1", "Alice", "knows", "Bob", Some(rowid));
8153 }
8154 let (status, body) = call(
8156 h.router.clone(),
8157 "GET",
8158 &neighbors_uri(&format!("ep:{focal}"), Some("semantic"), Some(0.0), None),
8159 None,
8160 )
8161 .await;
8162 assert_eq!(status, StatusCode::OK, "body: {body}");
8163 let edges = body["edges"].as_array().unwrap();
8164 for e in edges {
8165 assert_eq!(
8166 e["kind"], "semantic",
8167 "kind=semantic must drop explicit edges: {body}"
8168 );
8169 assert!(e["weight"].is_number(), "semantic edges carry weight: {body}");
8170 }
8171 });
8172 h.shutdown(&runtime);
8173 }
8174
8175 #[test]
8177 fn neighbors_both_default_returns_combined() {
8178 let runtime = rt();
8179 let h = Harness::new(&runtime);
8180 runtime.block_on(async {
8181 let focal = post_remember(h.router.clone(), "alpha alpha alpha").await;
8182 let _other1 = post_remember(h.router.clone(), "beta beta beta").await;
8183 {
8184 let conn = h.open_db();
8185 let rowid: i64 = conn
8186 .query_row(
8187 "SELECT rowid FROM episodes WHERE memory_id = ?1",
8188 rusqlite::params![&focal],
8189 |r| r.get(0),
8190 )
8191 .unwrap();
8192 seed_triple_row(&conn, "t-both-1", "Alice", "met", "Bob", Some(rowid));
8193 }
8194 let (status, body) = call(
8195 h.router.clone(),
8196 "GET",
8197 &neighbors_uri(&format!("ep:{focal}"), None, Some(0.0), None),
8200 None,
8201 )
8202 .await;
8203 assert_eq!(status, StatusCode::OK, "body: {body}");
8204 let edges = body["edges"].as_array().unwrap();
8205 let kinds: std::collections::HashSet<&str> = edges
8206 .iter()
8207 .map(|e| e["kind"].as_str().unwrap())
8208 .collect();
8209 assert!(
8210 kinds.contains("triple"),
8211 "expected at least one triple edge: {body}"
8212 );
8213 assert!(
8214 kinds.contains("semantic"),
8215 "expected at least one semantic edge: {body}"
8216 );
8217 });
8218 h.shutdown(&runtime);
8219 }
8220
8221 #[test]
8226 fn neighbors_dedupes_semantic_when_explicit_exists() {
8227 let runtime = rt();
8228 let h = Harness::new(&runtime);
8229 runtime.block_on(async {
8230 let focal = post_remember(h.router.clone(), "alpha alpha alpha").await;
8231 let _other = post_remember(h.router.clone(), "beta beta beta").await;
8267 {
8268 let conn = h.open_db();
8269 let rowid: i64 = conn
8270 .query_row(
8271 "SELECT rowid FROM episodes WHERE memory_id = ?1",
8272 rusqlite::params![&focal],
8273 |r| r.get(0),
8274 )
8275 .unwrap();
8276 seed_triple_row(
8277 &conn,
8278 "t-dedupe-1",
8279 "Alice",
8280 "knows",
8281 "Bob",
8282 Some(rowid),
8283 );
8284 }
8285 let (status, body) = call(
8286 h.router.clone(),
8287 "GET",
8288 &neighbors_uri(&format!("ep:{focal}"), Some("both"), Some(0.0), None),
8289 None,
8290 )
8291 .await;
8292 assert_eq!(status, StatusCode::OK, "body: {body}");
8293 let edges = body["edges"].as_array().unwrap();
8297 let mut seen: std::collections::HashMap<(String, String), i32> =
8298 std::collections::HashMap::new();
8299 for e in edges {
8300 let key = (
8301 e["source"].as_str().unwrap().to_string(),
8302 e["target"].as_str().unwrap().to_string(),
8303 );
8304 *seen.entry(key).or_insert(0) += 1;
8305 }
8306 for (pair, count) in &seen {
8307 assert_eq!(
8308 *count, 1,
8309 "edge pair {pair:?} appears {count} times -- dedupe rule violated: {body}"
8310 );
8311 }
8312 });
8313 h.shutdown(&runtime);
8314 }
8315
8316 #[test]
8319 fn neighbors_threshold_filters_low_similarity() {
8320 let runtime = rt();
8321 let h = Harness::new(&runtime);
8322 runtime.block_on(async {
8323 let focal = post_remember(h.router.clone(), "alpha alpha alpha").await;
8324 let _o1 = post_remember(h.router.clone(), "beta one").await;
8325 let _o2 = post_remember(h.router.clone(), "beta two").await;
8326 let _o3 = post_remember(h.router.clone(), "beta three").await;
8327 let (status, low_body) = call(
8329 h.router.clone(),
8330 "GET",
8331 &neighbors_uri(&format!("ep:{focal}"), Some("semantic"), Some(0.0), None),
8332 None,
8333 )
8334 .await;
8335 assert_eq!(status, StatusCode::OK, "body: {low_body}");
8336 let low_edge_count = low_body["edges"].as_array().unwrap().len();
8337 let (status, high_body) = call(
8339 h.router.clone(),
8340 "GET",
8341 &neighbors_uri(&format!("ep:{focal}"), Some("semantic"), Some(0.99), None),
8342 None,
8343 )
8344 .await;
8345 assert_eq!(status, StatusCode::OK, "body: {high_body}");
8346 let high_edge_count = high_body["edges"].as_array().unwrap().len();
8347 assert!(
8348 high_edge_count <= low_edge_count,
8349 "high-threshold ({high_edge_count}) must not exceed low-threshold ({low_edge_count}): low={low_body}, high={high_body}"
8350 );
8351 for e in high_body["edges"].as_array().unwrap() {
8354 if let Some(w) = e["weight"].as_f64() {
8355 assert!(
8356 w >= 0.99,
8357 "edge with weight {w} survived threshold=0.99: {e}"
8358 );
8359 }
8360 }
8361 });
8362 h.shutdown(&runtime);
8363 }
8364
8365 #[test]
8368 fn neighbors_limit_clamped_at_100() {
8369 let runtime = rt();
8370 let h = Harness::new(&runtime);
8371 {
8374 let conn = h.open_db();
8375 seed_cluster_row(&conn, "cl-huge-n", 1000);
8376 for i in 0..150 {
8377 let mid = format!("99119911-1111-7000-8000-{:012}", i);
8378 seed_episode(&conn, &mid, 100 + i as i64, &format!("content {i}"));
8379 seed_cluster_member(&conn, "cl-huge-n", &mid);
8380 }
8381 }
8382 let (status, body) = runtime.block_on(call(
8383 h.router.clone(),
8384 "GET",
8385 &neighbors_uri("cl:cl-huge-n", Some("explicit"), None, Some(999)),
8386 None,
8387 ));
8388 assert_eq!(status, StatusCode::OK, "body: {body}");
8389 let edges = body["edges"].as_array().unwrap();
8390 assert_eq!(
8391 edges.len(),
8392 100,
8393 "limit must be silently clamped to 100, got {}",
8394 edges.len()
8395 );
8396 h.shutdown(&runtime);
8397 }
8398
8399 #[test]
8401 fn neighbors_semantic_rejects_document_source() {
8402 let runtime = rt();
8403 let h = Harness::new(&runtime);
8404 let doc_id = "d-semrej-0000-7000-8000-000000000001";
8405 {
8406 let conn = h.open_db();
8407 seed_document_row(&conn, doc_id, "host");
8408 }
8409 let (status, body) = runtime.block_on(call(
8410 h.router.clone(),
8411 "GET",
8412 &neighbors_uri(
8413 &format!("doc:{doc_id}"),
8414 Some("semantic"),
8415 None,
8416 None,
8417 ),
8418 None,
8419 ));
8420 assert_eq!(status, StatusCode::BAD_REQUEST, "body: {body}");
8421 let err = body["error"].as_str().unwrap_or_default();
8422 assert!(
8423 err.contains("episode") && err.contains("chunk"),
8424 "error must list supported kinds: {body}"
8425 );
8426 h.shutdown(&runtime);
8427 }
8428
8429 #[test]
8431 fn neighbors_semantic_rejects_cluster_source() {
8432 let runtime = rt();
8433 let h = Harness::new(&runtime);
8434 let cluster_id = "cl-semrej-target";
8435 {
8436 let conn = h.open_db();
8437 seed_cluster_row(&conn, cluster_id, 12345);
8438 }
8439 let (status, body) = runtime.block_on(call(
8440 h.router.clone(),
8441 "GET",
8442 &neighbors_uri(
8443 &format!("cl:{cluster_id}"),
8444 Some("semantic"),
8445 None,
8446 None,
8447 ),
8448 None,
8449 ));
8450 assert_eq!(status, StatusCode::BAD_REQUEST, "body: {body}");
8451 h.shutdown(&runtime);
8452 }
8453
8454 #[test]
8458 fn neighbors_entity_returns_triples_only() {
8459 let runtime = rt();
8460 let h = Harness::new(&runtime);
8461 runtime.block_on(async {
8462 let host_mid = post_remember(h.router.clone(), "Alice and Bob talked").await;
8467 {
8468 let conn = h.open_db();
8469 let rowid: i64 = conn
8470 .query_row(
8471 "SELECT rowid FROM episodes WHERE memory_id = ?1",
8472 rusqlite::params![&host_mid],
8473 |r| r.get(0),
8474 )
8475 .unwrap();
8476 seed_triple_row(&conn, "t-ent-n-1", "Alice", "knows", "Bob", Some(rowid));
8477 seed_triple_row(&conn, "t-ent-n-2", "Alice", "works_at", "Acme", Some(rowid));
8478 }
8479 let (status, body) = call(
8480 h.router.clone(),
8481 "GET",
8482 &neighbors_uri("ent:Alice", None, Some(0.0), None),
8483 None,
8484 )
8485 .await;
8486 assert_eq!(status, StatusCode::OK, "body: {body}");
8487 let edges = body["edges"].as_array().unwrap();
8488 assert!(!edges.is_empty(), "expected explicit triples: {body}");
8489 for e in edges {
8490 assert_eq!(
8491 e["kind"], "triple",
8492 "entity focal must produce only triple edges: {body}"
8493 );
8494 }
8495 });
8496 h.shutdown(&runtime);
8497 }
8498
8499 #[test]
8502 fn neighbors_respects_tenant_scoping() {
8503 let runtime = rt();
8504 let h = Harness::new(&runtime);
8505 let memory_id = "a8880000-0000-7000-8000-000000000001";
8506 {
8507 let conn = h.open_db();
8508 seed_episode(&conn, memory_id, 100, "tenant scope");
8509 }
8510 let r = h.router.clone();
8512 let (status, _) = runtime.block_on(async {
8513 let req = Request::builder()
8514 .method("GET")
8515 .uri(neighbors_uri(
8516 &format!("ep:{memory_id}"),
8517 Some("explicit"),
8518 None,
8519 None,
8520 ))
8521 .header("x-solo-tenant", "never-registered-tenant-n")
8522 .body(Body::empty())
8523 .unwrap();
8524 let resp = r.oneshot(req).await.expect("oneshot");
8525 let s = resp.status();
8526 let _b = resp.into_body().collect().await.unwrap().to_bytes();
8527 (s, _b)
8528 });
8529 assert_eq!(status, StatusCode::NOT_FOUND);
8530 let (status, body) = runtime.block_on(call(
8532 h.router.clone(),
8533 "GET",
8534 &neighbors_uri(&format!("ep:{memory_id}"), Some("explicit"), None, None),
8535 None,
8536 ));
8537 assert_eq!(status, StatusCode::OK, "default tenant must resolve: {body}");
8538 h.shutdown(&runtime);
8539 }
8540
8541 #[test]
8544 fn neighbors_respects_auth_when_enabled() {
8545 let runtime = rt();
8546 let h = Harness::new_with_auth(&runtime, Some("neighbors-secret".into()));
8547 let (status, _) = runtime.block_on(call(
8549 h.router.clone(),
8550 "GET",
8551 &neighbors_uri(
8552 "ep:99999999-9999-7000-8000-000000000999",
8553 Some("explicit"),
8554 None,
8555 None,
8556 ),
8557 None,
8558 ));
8559 assert_eq!(status, StatusCode::UNAUTHORIZED);
8560 let (status, _) = runtime.block_on(call_with_auth(
8562 h.router.clone(),
8563 "GET",
8564 &neighbors_uri(
8565 "ep:99999999-9999-7000-8000-000000000999",
8566 Some("explicit"),
8567 None,
8568 None,
8569 ),
8570 None,
8571 Some("Bearer neighbors-secret"),
8572 ));
8573 assert_eq!(status, StatusCode::NOT_FOUND);
8574 h.shutdown(&runtime);
8575 }
8576
8577 #[derive(Debug, Clone)]
8592 struct ParsedSseEvent {
8593 event: String,
8594 data: Value,
8595 }
8596
8597 async fn read_one_sse_event(
8601 body: &mut axum::body::Body,
8602 timeout: std::time::Duration,
8603 ) -> Option<ParsedSseEvent> {
8604 use http_body_util::BodyExt;
8605 let mut buf = String::new();
8606 let start = std::time::Instant::now();
8607 loop {
8608 if start.elapsed() >= timeout {
8609 return None;
8610 }
8611 let remaining = timeout.saturating_sub(start.elapsed());
8612 let frame_res =
8613 tokio::time::timeout(remaining, body.frame()).await;
8614 let frame = match frame_res {
8615 Ok(Some(Ok(f))) => f,
8616 Ok(Some(Err(_))) | Ok(None) => return None,
8617 Err(_) => return None,
8618 };
8619 if let Ok(data) = frame.into_data() {
8620 buf.push_str(&String::from_utf8_lossy(&data));
8621 while let Some(idx) = buf.find("\n\n") {
8623 let block: String = buf.drain(..idx + 2).collect();
8624 if let Some(parsed) = parse_sse_block(&block) {
8625 return Some(parsed);
8626 }
8627 }
8628 }
8629 }
8630 }
8631
8632 fn parse_sse_block(block: &str) -> Option<ParsedSseEvent> {
8636 let mut event: Option<String> = None;
8637 let mut data: Option<String> = None;
8638 for line in block.lines() {
8639 if let Some(rest) = line.strip_prefix("event:") {
8640 event = Some(rest.trim().to_string());
8641 } else if let Some(rest) = line.strip_prefix("data:") {
8642 data = Some(rest.trim().to_string());
8643 }
8644 }
8645 let event = event?;
8646 let data_str = data?;
8647 let data_json = serde_json::from_str(&data_str).ok()?;
8648 Some(ParsedSseEvent {
8649 event,
8650 data: data_json,
8651 })
8652 }
8653
8654 async fn open_sse_stream_inner(
8658 router: axum::Router,
8659 auth: Option<&str>,
8660 tenant: Option<&str>,
8661 ) -> (StatusCode, axum::body::Body) {
8662 let mut builder = Request::builder()
8663 .method("GET")
8664 .uri("/v1/graph/stream");
8665 if let Some(a) = auth {
8666 builder = builder.header("authorization", a);
8667 }
8668 if let Some(t) = tenant {
8669 builder = builder.header("x-solo-tenant", t);
8670 }
8671 let req = builder
8672 .header("content-length", "0")
8673 .body(Body::empty())
8674 .unwrap();
8675 let resp = router.oneshot(req).await.expect("oneshot");
8676 let status = resp.status();
8677 let body = resp.into_body();
8678 (status, body)
8679 }
8680
8681 #[test]
8683 fn stream_emits_init_event_on_connect() {
8684 let runtime = rt();
8685 let h = Harness::new(&runtime);
8686 let r = h.router.clone();
8687 runtime.block_on(async {
8688 let (status, mut body) = open_sse_stream_inner(r, None, None).await;
8689 assert_eq!(status, StatusCode::OK);
8690 let ev = read_one_sse_event(&mut body, std::time::Duration::from_secs(2))
8691 .await
8692 .expect("must receive init event within 2s");
8693 assert_eq!(ev.event, "init");
8694 assert_eq!(ev.data["connected"].as_bool(), Some(true));
8695 assert_eq!(ev.data["tenant_id"].as_str(), Some("default"));
8696 assert!(ev.data["ts_ms"].is_number());
8697 });
8698 h.shutdown(&runtime);
8699 }
8700
8701 #[test]
8704 fn stream_emits_invalidate_after_writer_event() {
8705 let runtime = rt();
8706 let h = Harness::new(&runtime);
8707 let r = h.router.clone();
8708 let sender = h.invalidate_sender();
8709 runtime.block_on(async {
8710 let (status, mut body) = open_sse_stream_inner(r, None, None).await;
8711 assert_eq!(status, StatusCode::OK);
8712 let init = read_one_sse_event(&mut body, std::time::Duration::from_secs(2))
8714 .await
8715 .unwrap();
8716 assert_eq!(init.event, "init");
8717 sender
8719 .send(InvalidateEvent {
8720 reason: "memory.remember".to_string(),
8721 tenant_id: "default".to_string(),
8722 ts_ms: 1_715_625_600_000,
8723 kind: "episode".to_string(),
8724 })
8725 .expect("must have at least one subscriber");
8726 let ev = read_one_sse_event(&mut body, std::time::Duration::from_secs(2))
8728 .await
8729 .expect("invalidate event must arrive within 2s");
8730 assert_eq!(ev.event, "invalidate");
8731 assert_eq!(ev.data["reason"].as_str(), Some("memory.remember"));
8732 assert_eq!(ev.data["tenant_id"].as_str(), Some("default"));
8733 assert_eq!(ev.data["kind"].as_str(), Some("episode"));
8734 });
8735 h.shutdown(&runtime);
8736 }
8737
8738 #[test]
8741 fn stream_emits_invalidate_for_each_writer_command() {
8742 let runtime = rt();
8743 let h = Harness::new(&runtime);
8744 let r = h.router.clone();
8745 let sender = h.invalidate_sender();
8746 let cases = [
8747 ("memory.remember", "episode"),
8748 ("memory.forget", "episode"),
8749 ("memory.consolidate", "cluster"),
8750 ("memory.ingest_document", "document"),
8751 ("memory.forget_document", "document"),
8752 ("memory.triples_extract", "cluster"),
8753 ("memory.reembed", "episode"),
8754 ("gdpr.forget_user", "tenant"),
8755 ];
8756 runtime.block_on(async {
8757 let (status, mut body) = open_sse_stream_inner(r, None, None).await;
8758 assert_eq!(status, StatusCode::OK);
8759 let _ = read_one_sse_event(&mut body, std::time::Duration::from_secs(2))
8761 .await
8762 .unwrap();
8763 for (reason, kind) in cases {
8764 sender
8765 .send(InvalidateEvent {
8766 reason: reason.to_string(),
8767 tenant_id: "default".to_string(),
8768 ts_ms: 1_715_625_600_000,
8769 kind: kind.to_string(),
8770 })
8771 .unwrap();
8772 let ev = read_one_sse_event(&mut body, std::time::Duration::from_secs(2))
8773 .await
8774 .unwrap_or_else(|| panic!("must receive event for {reason}"));
8775 assert_eq!(ev.event, "invalidate");
8776 assert_eq!(
8777 ev.data["reason"].as_str(),
8778 Some(reason),
8779 "reason mismatch"
8780 );
8781 assert_eq!(ev.data["kind"].as_str(), Some(kind), "kind mismatch");
8782 }
8783 });
8784 h.shutdown(&runtime);
8785 }
8786
8787 #[test]
8795 fn stream_emits_heartbeat_when_no_events() {
8796 let runtime = rt();
8797 let h = Harness::new(&runtime);
8798 let sender = h.invalidate_sender();
8799 runtime.block_on(async {
8800 let rx = sender.subscribe();
8803 let stream = build_invalidate_stream(rx, "default".to_string(), 1);
8806 let sse: Sse<_> = Sse::new(stream);
8810 let resp = sse.into_response();
8811 let mut body = resp.into_body();
8812 let first =
8814 read_one_sse_event(&mut body, std::time::Duration::from_secs(2))
8815 .await
8816 .expect("init event must arrive");
8817 assert_eq!(first.event, "init");
8818 let second =
8821 read_one_sse_event(&mut body, std::time::Duration::from_secs(3))
8822 .await
8823 .expect("heartbeat event must arrive within 3s");
8824 assert_eq!(second.event, "heartbeat");
8825 assert!(second.data["ts_ms"].is_number());
8826 });
8827 h.shutdown(&runtime);
8828 }
8829
8830 #[test]
8833 fn stream_concurrent_subscribers_same_tenant() {
8834 let runtime = rt();
8835 let h = Harness::new(&runtime);
8836 let r1 = h.router.clone();
8837 let r2 = h.router.clone();
8838 let r3 = h.router.clone();
8839 let sender = h.invalidate_sender();
8840 runtime.block_on(async {
8841 let (s1, mut body1) = open_sse_stream_inner(r1, None, None).await;
8843 let (s2, mut body2) = open_sse_stream_inner(r2, None, None).await;
8844 let (s3, mut body3) = open_sse_stream_inner(r3, None, None).await;
8845 assert_eq!(s1, StatusCode::OK);
8846 assert_eq!(s2, StatusCode::OK);
8847 assert_eq!(s3, StatusCode::OK);
8848 for body in [&mut body1, &mut body2, &mut body3] {
8850 let ev = read_one_sse_event(body, std::time::Duration::from_secs(2))
8851 .await
8852 .unwrap();
8853 assert_eq!(ev.event, "init");
8854 }
8855 assert!(
8857 sender.receiver_count() >= 3,
8858 "expected ≥3 subscribers, got {}",
8859 sender.receiver_count()
8860 );
8861 sender
8863 .send(InvalidateEvent {
8864 reason: "memory.remember".to_string(),
8865 tenant_id: "default".to_string(),
8866 ts_ms: 1_715_625_600_000,
8867 kind: "episode".to_string(),
8868 })
8869 .expect("send must succeed");
8870 for body in [&mut body1, &mut body2, &mut body3] {
8872 let ev = read_one_sse_event(body, std::time::Duration::from_secs(2))
8873 .await
8874 .unwrap();
8875 assert_eq!(ev.event, "invalidate");
8876 assert_eq!(ev.data["reason"].as_str(), Some("memory.remember"));
8877 }
8878 });
8879 h.shutdown(&runtime);
8880 }
8881
8882 #[test]
8885 fn stream_handles_client_disconnect_gracefully() {
8886 let runtime = rt();
8887 let h = Harness::new(&runtime);
8888 let r = h.router.clone();
8889 let sender = h.invalidate_sender();
8890 let before = sender.receiver_count();
8891 runtime.block_on(async {
8892 let (status, mut body) = open_sse_stream_inner(r, None, None).await;
8893 assert_eq!(status, StatusCode::OK);
8894 let _ = read_one_sse_event(&mut body, std::time::Duration::from_secs(2))
8896 .await
8897 .unwrap();
8898 let during = sender.receiver_count();
8899 assert!(
8900 during > before,
8901 "subscriber count must increase while stream is live (before={before}, during={during})"
8902 );
8903 drop(body);
8907 });
8908 runtime.block_on(async {
8910 tokio::time::sleep(std::time::Duration::from_millis(50)).await;
8911 });
8912 let after = sender.receiver_count();
8913 assert!(
8914 after <= before,
8915 "subscriber count must drop back after disconnect (before={before}, after={after})"
8916 );
8917 h.shutdown(&runtime);
8918 }
8919
8920 #[test]
8922 fn stream_respects_auth_when_enabled() {
8923 let runtime = rt();
8924 let h = Harness::new_with_auth(&runtime, Some("stream-secret".into()));
8925 let r = h.router.clone();
8926 runtime.block_on(async {
8927 let (status, _body) = open_sse_stream_inner(r, None, None).await;
8928 assert_eq!(status, StatusCode::UNAUTHORIZED);
8929 });
8930 h.shutdown(&runtime);
8931 }
8932
8933 #[test]
8935 fn stream_works_with_auth_none() {
8936 let runtime = rt();
8937 let h = Harness::new(&runtime);
8938 let r = h.router.clone();
8939 runtime.block_on(async {
8940 let (status, mut body) = open_sse_stream_inner(r, None, None).await;
8941 assert_eq!(status, StatusCode::OK);
8942 let ev = read_one_sse_event(&mut body, std::time::Duration::from_secs(2))
8943 .await
8944 .expect("must receive init event");
8945 assert_eq!(ev.event, "init");
8946 });
8947 h.shutdown(&runtime);
8948 }
8949
8950 #[test]
8952 fn stream_respects_auth_accepts_valid_token() {
8953 let runtime = rt();
8954 let h = Harness::new_with_auth(&runtime, Some("stream-secret".into()));
8955 let r = h.router.clone();
8956 runtime.block_on(async {
8957 let (status, mut body) =
8958 open_sse_stream_inner(r, Some("Bearer stream-secret"), None).await;
8959 assert_eq!(status, StatusCode::OK);
8960 let ev = read_one_sse_event(&mut body, std::time::Duration::from_secs(2))
8961 .await
8962 .expect("must receive init event with valid bearer");
8963 assert_eq!(ev.event, "init");
8964 assert_eq!(ev.data["tenant_id"].as_str(), Some("default"));
8965 });
8966 h.shutdown(&runtime);
8967 }
8968
8969 #[test]
8972 fn stream_respects_tenant_scoping() {
8973 let runtime = rt();
8974 let h = Harness::new(&runtime);
8975 let r = h.router.clone();
8976 runtime.block_on(async {
8977 let (status, _body) =
8978 open_sse_stream_inner(r, None, Some("never-registered-tenant-x")).await;
8979 assert_eq!(status, StatusCode::NOT_FOUND);
8983 });
8984 h.shutdown(&runtime);
8985 }
8986
8987 async fn seed_three_tenants(registry: &TenantRegistry) -> Vec<String> {
9005 use solo_core::TenantId as TenantIdT;
9006 let ids = ["alice", "bob", "default"];
9007 for id in ids {
9008 let tid = TenantIdT::new(id).unwrap();
9009 registry
9010 .with_index(|idx| {
9011 idx.register(&tid, &format!("{id}.db"), Some(&format!("{id} tenant")))
9012 .unwrap();
9013 })
9018 .await;
9019 tokio::time::sleep(std::time::Duration::from_millis(2)).await;
9020 }
9021 vec!["alice".into(), "bob".into(), "default".into()]
9025 }
9026
9027 #[test]
9031 fn tenants_returns_all_when_auth_none() {
9032 let runtime = rt();
9033 let h = Harness::new(&runtime);
9034 let r = h.router.clone();
9035 runtime.block_on(async {
9036 let _expected = seed_three_tenants(&h.registry).await;
9037 let (status, body) = call(r, "GET", "/v1/tenants", None).await;
9038 assert_eq!(status, StatusCode::OK);
9039 let arr = body
9040 .get("tenants")
9041 .and_then(|v| v.as_array())
9042 .expect("tenants array");
9043 assert_eq!(arr.len(), 3, "got body: {body}");
9044 let ids: Vec<&str> =
9045 arr.iter().filter_map(|t| t["id"].as_str()).collect();
9046 assert_eq!(ids, vec!["alice", "bob", "default"]);
9047 });
9048 h.shutdown(&runtime);
9049 }
9050
9051 #[test]
9056 fn tenants_returns_all_when_bearer_auth() {
9057 let runtime = rt();
9058 let h = Harness::new_with_auth(&runtime, Some("tlist-secret".into()));
9059 let r = h.router.clone();
9060 runtime.block_on(async {
9061 seed_three_tenants(&h.registry).await;
9062 let (status, body) = call_with_auth(
9063 r,
9064 "GET",
9065 "/v1/tenants",
9066 None,
9067 Some("Bearer tlist-secret"),
9068 )
9069 .await;
9070 assert_eq!(status, StatusCode::OK, "got body: {body}");
9071 let arr = body["tenants"].as_array().expect("tenants array");
9072 assert_eq!(arr.len(), 3, "bearer must see all tenants");
9073 });
9074 h.shutdown(&runtime);
9075 }
9076
9077 #[test]
9081 fn tenants_filters_to_principal_claim_when_oidc() {
9082 let runtime = rt();
9083 let (fake_server, discovery_url, secret, kid) =
9084 runtime.block_on(async { spin_fake_idp().await });
9085 let server_uri = fake_server.uri();
9086 let _server_guard = fake_server;
9087
9088 let auth = crate::auth::AuthConfig::Oidc {
9089 discovery_url,
9090 audience: "tlist-audience".to_string(),
9091 tenant_claim_name: "solo_tenant".to_string(),
9092 };
9093 let h = Harness::new_with_auth_config(&runtime, Some(auth));
9094 let r = h.router.clone();
9095
9096 runtime.block_on(async {
9097 seed_three_tenants(&h.registry).await;
9098 let token = mint_idp_token(
9099 &server_uri,
9100 kid,
9101 &secret,
9102 "alice",
9103 "tlist-audience",
9104 );
9105 let (status, body) = call_with_auth(
9106 r,
9107 "GET",
9108 "/v1/tenants",
9109 None,
9110 Some(&format!("Bearer {token}")),
9111 )
9112 .await;
9113 assert_eq!(status, StatusCode::OK, "got body: {body}");
9114 let arr = body["tenants"].as_array().expect("tenants array");
9115 assert_eq!(arr.len(), 1, "OIDC alice must see exactly one tenant");
9116 assert_eq!(arr[0]["id"].as_str(), Some("alice"));
9117 });
9118 h.shutdown(&runtime);
9119 }
9120
9121 #[test]
9127 fn tenants_returns_empty_when_oidc_claim_unmatched() {
9128 let runtime = rt();
9129 let (fake_server, discovery_url, secret, kid) =
9130 runtime.block_on(async { spin_fake_idp().await });
9131 let server_uri = fake_server.uri();
9132 let _server_guard = fake_server;
9133
9134 let auth = crate::auth::AuthConfig::Oidc {
9135 discovery_url,
9136 audience: "tlist-audience".to_string(),
9137 tenant_claim_name: "solo_tenant".to_string(),
9138 };
9139 let h = Harness::new_with_auth_config(&runtime, Some(auth));
9140 let r = h.router.clone();
9141
9142 runtime.block_on(async {
9143 seed_three_tenants(&h.registry).await;
9144 let token = mint_idp_token(
9147 &server_uri,
9148 kid,
9149 &secret,
9150 "nonexistent",
9151 "tlist-audience",
9152 );
9153 let (status, body) = call_with_auth(
9154 r,
9155 "GET",
9156 "/v1/tenants",
9157 None,
9158 Some(&format!("Bearer {token}")),
9159 )
9160 .await;
9161 assert_eq!(
9162 status,
9163 StatusCode::OK,
9164 "must be 200 OK, not 404 — don't leak tenant existence: {body}"
9165 );
9166 let arr = body["tenants"].as_array().expect("tenants array");
9167 assert_eq!(
9168 arr.len(),
9169 0,
9170 "unmatched OIDC claim must produce empty list, got: {body}"
9171 );
9172 });
9173 h.shutdown(&runtime);
9174 }
9175
9176 #[test]
9185 fn tenants_response_shape_matches_solo_web_types() {
9186 let runtime = rt();
9187 let h = Harness::new(&runtime);
9188 let r = h.router.clone();
9189 runtime.block_on(async {
9190 let tid = solo_core::TenantId::new("shaped").unwrap();
9193 h.registry
9194 .with_index(|idx| {
9195 idx.register_with_quota(
9196 &tid,
9197 "shaped.db",
9198 Some("Shaped tenant"),
9199 Some(1_048_576),
9200 )
9201 .unwrap();
9202 })
9203 .await;
9204 let (status, body) = call(r, "GET", "/v1/tenants", None).await;
9205 assert_eq!(status, StatusCode::OK);
9206 let item = &body["tenants"][0];
9207 assert_eq!(item["id"].as_str(), Some("shaped"));
9209 assert_eq!(item["display_name"].as_str(), Some("Shaped tenant"));
9210 assert!(
9211 item["created_at_ms"].is_i64(),
9212 "created_at_ms must be an i64, got {item}"
9213 );
9214 assert_eq!(item["status"].as_str(), Some("active"));
9215 assert_eq!(item["quota_bytes"].as_u64(), Some(1_048_576));
9217 assert!(
9221 item["episode_count"].is_null(),
9222 "episode_count must be JSON null in v0.10.0, got {item}"
9223 );
9224 assert!(
9225 item["size_bytes"].is_null(),
9226 "size_bytes must be JSON null in v0.10.0, got {item}"
9227 );
9228 assert!(
9229 item["pct_used"].is_null(),
9230 "pct_used must be JSON null in v0.10.0, got {item}"
9231 );
9232 });
9233 h.shutdown(&runtime);
9234 }
9235
9236 #[test]
9241 fn tenants_respects_auth_when_enabled() {
9242 let runtime = rt();
9243 let h = Harness::new_with_auth(&runtime, Some("must-auth".into()));
9244 let r = h.router.clone();
9245 runtime.block_on(async {
9246 seed_three_tenants(&h.registry).await;
9247 let (status, _body) = call(r, "GET", "/v1/tenants", None).await;
9249 assert_eq!(status, StatusCode::UNAUTHORIZED);
9250 });
9251 h.shutdown(&runtime);
9252 }
9253
9254 #[test]
9259 fn tenants_status_filter_excludes_non_active() {
9260 let runtime = rt();
9261 let h = Harness::new(&runtime);
9262 let r = h.router.clone();
9263 runtime.block_on(async {
9264 let keeper = solo_core::TenantId::new("keeper").unwrap();
9267 let migrating = solo_core::TenantId::new("migrating").unwrap();
9268 let deleting = solo_core::TenantId::new("deleting").unwrap();
9269 h.registry
9270 .with_index(|idx| {
9271 idx.register(&keeper, "keeper.db", None).unwrap();
9272 idx.register_with_status(
9273 &migrating,
9274 "migrating.db",
9275 None,
9276 solo_storage::TenantStatus::PendingMigration,
9277 )
9278 .unwrap();
9279 idx.register_with_status(
9280 &deleting,
9281 "deleting.db",
9282 None,
9283 solo_storage::TenantStatus::PendingDelete,
9284 )
9285 .unwrap();
9286 })
9287 .await;
9288 let (status, body) = call(r, "GET", "/v1/tenants", None).await;
9289 assert_eq!(status, StatusCode::OK);
9290 let arr = body["tenants"].as_array().expect("tenants array");
9291 let ids: Vec<&str> =
9292 arr.iter().filter_map(|t| t["id"].as_str()).collect();
9293 assert_eq!(
9294 ids,
9295 vec!["keeper"],
9296 "only Active tenants visible; got: {body}"
9297 );
9298 });
9299 h.shutdown(&runtime);
9300 }
9301
9302 #[test]
9307 fn tenants_returns_empty_array_when_no_tenants_registered() {
9308 let runtime = rt();
9309 let h = Harness::new(&runtime);
9310 let r = h.router.clone();
9311 runtime.block_on(async {
9312 let (status, body) = call(r, "GET", "/v1/tenants", None).await;
9316 assert_eq!(status, StatusCode::OK);
9317 let arr = body["tenants"].as_array().expect("tenants array");
9318 assert_eq!(arr.len(), 0, "expected empty array, got: {body}");
9319 });
9320 h.shutdown(&runtime);
9321 }
9322
9323 fn make_record(id: &str) -> solo_storage::TenantRecord {
9333 solo_storage::TenantRecord {
9334 tenant_id: solo_core::TenantId::new(id).unwrap(),
9335 db_filename: format!("{id}.db"),
9336 display_name: None,
9337 created_at_ms: 0,
9338 status: solo_storage::TenantStatus::Active,
9339 quota_bytes: None,
9340 last_accessed_ms: None,
9341 }
9342 }
9343
9344 #[test]
9345 fn filter_no_principal_returns_all() {
9346 let records = vec![make_record("a"), make_record("b")];
9347 let out = filter_tenants_for_principal(records.clone(), None);
9348 assert_eq!(out.len(), 2);
9349 assert_eq!(out[0].tenant_id.as_str(), "a");
9350 assert_eq!(out[1].tenant_id.as_str(), "b");
9351 }
9352
9353 #[test]
9354 fn filter_bearer_principal_returns_all() {
9355 let records = vec![make_record("a"), make_record("b")];
9356 let p = AuthenticatedPrincipal::bearer(
9357 solo_core::TenantId::new("a").unwrap(),
9358 );
9359 let out = filter_tenants_for_principal(records, Some(&p));
9360 assert_eq!(out.len(), 2);
9361 }
9362
9363 #[test]
9364 fn filter_oidc_principal_keeps_only_claim() {
9365 let records = vec![make_record("a"), make_record("b"), make_record("c")];
9366 let p = AuthenticatedPrincipal {
9368 subject: "alice@example.com".to_string(),
9369 tenant_claim: Some(solo_core::TenantId::new("b").unwrap()),
9370 scopes: vec!["read".to_string()],
9371 claims: serde_json::json!({ "sub": "alice@example.com" }),
9372 };
9373 let out = filter_tenants_for_principal(records, Some(&p));
9374 assert_eq!(out.len(), 1);
9375 assert_eq!(out[0].tenant_id.as_str(), "b");
9376 }
9377
9378 #[test]
9379 fn filter_oidc_principal_with_no_claim_returns_empty() {
9380 let records = vec![make_record("a")];
9383 let p = AuthenticatedPrincipal {
9384 subject: "alice@example.com".to_string(),
9385 tenant_claim: None,
9386 scopes: vec![],
9387 claims: serde_json::json!({ "sub": "alice@example.com" }),
9388 };
9389 let out = filter_tenants_for_principal(records, Some(&p));
9390 assert!(out.is_empty());
9391 }
9392
9393 #[test]
9394 fn is_single_principal_bearer_discriminator() {
9395 let bearer = AuthenticatedPrincipal::bearer(
9396 solo_core::TenantId::new("default").unwrap(),
9397 );
9398 assert!(is_single_principal_bearer(&bearer));
9399
9400 let oidc = AuthenticatedPrincipal {
9401 subject: "alice".to_string(),
9402 tenant_claim: Some(solo_core::TenantId::new("alice").unwrap()),
9403 scopes: vec![],
9404 claims: serde_json::json!({ "x": 1 }),
9405 };
9406 assert!(!is_single_principal_bearer(&oidc));
9407
9408 let weird = AuthenticatedPrincipal {
9412 subject: "bearer".to_string(),
9413 tenant_claim: Some(solo_core::TenantId::default_tenant()),
9414 scopes: vec![],
9415 claims: serde_json::json!({ "leak": 1 }),
9416 };
9417 assert!(!is_single_principal_bearer(&weird));
9418 }
9419}
9420
9421#[cfg(test)]
9422mod cors_tests {
9423 use super::is_localhost_origin;
9424
9425 #[test]
9426 fn accepts_canonical_localhost_origins() {
9427 assert!(is_localhost_origin("http://localhost"));
9428 assert!(is_localhost_origin("http://localhost:3000"));
9429 assert!(is_localhost_origin("https://localhost:8443"));
9430 assert!(is_localhost_origin("http://127.0.0.1"));
9431 assert!(is_localhost_origin("http://127.0.0.1:5173"));
9432 assert!(is_localhost_origin("http://[::1]"));
9433 assert!(is_localhost_origin("http://[::1]:8080"));
9434 }
9435
9436 #[test]
9437 fn rejects_remote_origins() {
9438 assert!(!is_localhost_origin("http://example.com"));
9439 assert!(!is_localhost_origin("https://malicious.example"));
9440 assert!(!is_localhost_origin("http://192.168.1.5"));
9441 assert!(!is_localhost_origin("http://10.0.0.1"));
9442 }
9443
9444 #[test]
9445 fn rejects_dns_rebinding_tricks() {
9446 assert!(!is_localhost_origin("http://127.0.0.1.nip.io"));
9450 assert!(!is_localhost_origin("http://localhost.evil.com"));
9451 assert!(!is_localhost_origin("http://evil.localhost"));
9452 }
9453
9454 #[test]
9455 fn rejects_non_http_schemes() {
9456 assert!(!is_localhost_origin("file:///"));
9457 assert!(!is_localhost_origin("ws://localhost:3000"));
9458 assert!(!is_localhost_origin("javascript:alert(1)"));
9459 }
9460
9461 #[test]
9462 fn rejects_malformed() {
9463 assert!(!is_localhost_origin(""));
9464 assert!(!is_localhost_origin("localhost"));
9465 assert!(!is_localhost_origin("//localhost"));
9466 }
9467}
9468