1use axum::{
4 extract::{Query, State},
5 Json,
6};
7use serde::Deserialize;
8use std::sync::Arc;
9use tokio::sync::RwLock;
10
11use crate::error::{Result, WebError};
12use crate::models::{
13 AdaptiveDreamState, CognitionOverviewResponse, DashboardResponse, DigestEntry,
14 DigestFreshnessState, DigestListResponse, DreamState, JobEntry, JobListResponse,
15 JobSummaryResponse, QueryIntrospectionResponse, RecallComposition, ReflectionSampleEntry,
16 ReflectionStateResponse, RuntimeResponse,
17};
18use crate::state::AppState;
19use nexus_core::CognitiveLevel;
20
21#[derive(Debug, Deserialize, Default)]
24pub struct JobQueryParams {
25 pub namespace: String,
26 pub status: Option<String>,
27 pub job_type: Option<String>,
28 #[serde(default = "default_limit")]
29 pub limit: i64,
30 #[serde(default)]
31 pub offset: i64,
32}
33
34#[derive(Debug, Deserialize, Default)]
35pub struct JobSummaryQueryParams {
36 pub namespace: String,
37 pub job_type: Option<String>,
38}
39
40#[derive(Debug, Deserialize, Default)]
41pub struct DigestQueryParams {
42 pub namespace: String,
43 pub session_key: Option<String>,
44 #[serde(default = "default_limit")]
45 pub limit: i64,
46 #[serde(default)]
47 pub offset: i64,
48}
49
50fn default_limit() -> i64 {
51 50
52}
53
54pub async fn list_jobs(
58 State(state): State<Arc<RwLock<AppState>>>,
59 Query(params): Query<JobQueryParams>,
60) -> Result<Json<JobListResponse>> {
61 if params.namespace.trim().is_empty() {
62 return Err(WebError::InvalidRequest(
63 "namespace query parameter is required".to_string(),
64 ));
65 }
66
67 let state = state.read().await;
68
69 let namespace = state
70 .namespace_repo
71 .get_by_name(¶ms.namespace)
72 .await?
73 .ok_or_else(|| WebError::NotFound(format!("Namespace '{}' not found", params.namespace)))?;
74
75 let limit = params.limit.clamp(1, 200);
76 let offset = params.offset.max(0);
77
78 let rows = state
79 .memory_repo
80 .list_jobs(
81 namespace.id,
82 params.job_type.as_deref(),
83 params.status.as_deref(),
84 limit,
85 offset,
86 )
87 .await?;
88
89 let total = state
90 .memory_repo
91 .count_jobs(
92 namespace.id,
93 params.job_type.as_deref(),
94 params.status.as_deref(),
95 )
96 .await?;
97
98 let jobs: Vec<JobEntry> = rows.into_iter().map(JobEntry::from).collect();
99
100 Ok(Json(JobListResponse {
101 success: true,
102 namespace: params.namespace,
103 jobs,
104 total,
105 }))
106}
107
108pub async fn job_summary(
110 State(state): State<Arc<RwLock<AppState>>>,
111 Query(params): Query<JobSummaryQueryParams>,
112) -> Result<Json<JobSummaryResponse>> {
113 if params.namespace.trim().is_empty() {
114 return Err(WebError::InvalidRequest(
115 "namespace query parameter is required".to_string(),
116 ));
117 }
118
119 let state = state.read().await;
120
121 let namespace = state
122 .namespace_repo
123 .get_by_name(¶ms.namespace)
124 .await?
125 .ok_or_else(|| WebError::NotFound(format!("Namespace '{}' not found", params.namespace)))?;
126
127 let rows = state
128 .memory_repo
129 .count_jobs_by_status(namespace.id, params.job_type.as_deref())
130 .await?;
131
132 let counts = rows.into_iter().collect();
133
134 Ok(Json(JobSummaryResponse {
135 success: true,
136 namespace: params.namespace,
137 counts,
138 }))
139}
140
141pub async fn list_digests(
143 State(state): State<Arc<RwLock<AppState>>>,
144 Query(params): Query<DigestQueryParams>,
145) -> Result<Json<DigestListResponse>> {
146 if params.namespace.trim().is_empty() {
147 return Err(WebError::InvalidRequest(
148 "namespace query parameter is required".to_string(),
149 ));
150 }
151
152 let state = state.read().await;
153
154 let namespace = state
155 .namespace_repo
156 .get_by_name(¶ms.namespace)
157 .await?
158 .ok_or_else(|| WebError::NotFound(format!("Namespace '{}' not found", params.namespace)))?;
159
160 let limit = params.limit.clamp(1, 200);
161 let offset = params.offset.max(0);
162
163 let rows = state
164 .memory_repo
165 .list_digests(namespace.id, params.session_key.as_deref(), limit, offset)
166 .await?;
167
168 let total = state
169 .memory_repo
170 .count_digests(namespace.id, params.session_key.as_deref())
171 .await?;
172
173 let digests: Vec<DigestEntry> = rows.into_iter().map(DigestEntry::from).collect();
174
175 Ok(Json(DigestListResponse {
176 success: true,
177 namespace: params.namespace,
178 digests,
179 total,
180 }))
181}
182
183pub async fn runtime_health(
185 State(state): State<Arc<RwLock<AppState>>>,
186) -> Result<Json<RuntimeResponse>> {
187 let state = state.read().await;
188
189 let db_connected = sqlx::query_scalar::<_, i64>("SELECT 1")
191 .fetch_one(state.pool())
192 .await
193 .is_ok();
194
195 let agent_enabled = state.agent_supervisor.is_some();
196 let active_sessions = state.orchestrator.active_session_count().await;
197
198 Ok(Json(RuntimeResponse {
199 success: true,
200 version: env!("CARGO_PKG_VERSION").to_string(),
201 uptime_seconds: state.uptime_seconds(),
202 db_connected,
203 agent_enabled,
204 active_sessions,
205 }))
206}
207
208#[derive(Debug, Deserialize, Default)]
209pub struct OverviewQueryParams {
210 pub namespace: String,
211}
212
213#[derive(Debug, Deserialize, Default)]
214pub struct ReflectionQueryParams {
215 pub namespace: String,
216 #[serde(default = "default_limit")]
217 pub limit: i64,
218}
219
220pub async fn cognition_overview(
222 State(state): State<Arc<RwLock<AppState>>>,
223 Query(params): Query<OverviewQueryParams>,
224) -> Result<Json<CognitionOverviewResponse>> {
225 if params.namespace.trim().is_empty() {
226 return Err(WebError::InvalidRequest(
227 "namespace query parameter is required".to_string(),
228 ));
229 }
230
231 let state = state.read().await;
232
233 let namespace = state
234 .namespace_repo
235 .get_by_name(¶ms.namespace)
236 .await?
237 .ok_or_else(|| WebError::NotFound(format!("Namespace '{}' not found", params.namespace)))?;
238
239 let status_rows = state
240 .memory_repo
241 .count_jobs_by_status(namespace.id, None)
242 .await?;
243 let jobs_by_status: std::collections::HashMap<String, i64> = status_rows.into_iter().collect();
244
245 let digest_count = state.memory_repo.count_digests(namespace.id, None).await?;
246
247 let evidence_count = state.memory_repo.count_evidence(namespace.id).await?;
248 let stage_metrics = state
249 .memory_repo
250 .latest_metrics_for_namespace(namespace.id, Some("cognition."), 64)
251 .await?
252 .into_iter()
253 .fold(
254 std::collections::HashMap::new(),
255 |mut acc: std::collections::HashMap<String, f64>, metric| {
256 acc.entry(metric.metric_name).or_insert(metric.metric_value);
257 acc
258 },
259 );
260
261 Ok(Json(CognitionOverviewResponse {
262 success: true,
263 namespace: params.namespace,
264 jobs_by_status,
265 digest_count,
266 evidence_count,
267 stage_metrics,
268 }))
269}
270
271pub async fn reflection_state(
273 State(state): State<Arc<RwLock<AppState>>>,
274 Query(params): Query<ReflectionQueryParams>,
275) -> Result<Json<ReflectionStateResponse>> {
276 if params.namespace.trim().is_empty() {
277 return Err(WebError::InvalidRequest(
278 "namespace query parameter is required".to_string(),
279 ));
280 }
281
282 let state = state.read().await;
283
284 let namespace = state
285 .namespace_repo
286 .get_by_name(¶ms.namespace)
287 .await?
288 .ok_or_else(|| WebError::NotFound(format!("Namespace '{}' not found", params.namespace)))?;
289
290 let limit = params.limit.clamp(1, 50);
291 let contradiction_count = state
292 .memory_repo
293 .count_by_cognitive_level(namespace.id, CognitiveLevel::Contradiction)
294 .await?;
295 let derived_count = state
296 .memory_repo
297 .count_by_cognitive_level(namespace.id, CognitiveLevel::Derived)
298 .await?;
299 let recent_contradictions = state
300 .memory_repo
301 .get_by_cognitive_level(namespace.id, CognitiveLevel::Contradiction, limit)
302 .await?
303 .into_iter()
304 .map(ReflectionSampleEntry::from)
305 .collect();
306 let recent_derived = state
307 .memory_repo
308 .get_by_cognitive_level(namespace.id, CognitiveLevel::Derived, limit)
309 .await?
310 .into_iter()
311 .map(ReflectionSampleEntry::from)
312 .collect();
313
314 Ok(Json(ReflectionStateResponse {
315 success: true,
316 namespace: params.namespace,
317 contradiction_count,
318 derived_count,
319 recent_contradictions,
320 recent_derived,
321 }))
322}
323
324#[derive(Debug, Deserialize, Default)]
327pub struct QueryIntrospectionQueryParams {
328 pub namespace: String,
329 pub question: String,
330}
331
332pub async fn query_introspection(
338 State(state): State<Arc<RwLock<AppState>>>,
339 Query(params): Query<QueryIntrospectionQueryParams>,
340) -> Result<Json<QueryIntrospectionResponse>> {
341 if params.namespace.trim().is_empty() {
342 return Err(WebError::InvalidRequest(
343 "namespace query parameter is required".to_string(),
344 ));
345 }
346 if params.question.trim().is_empty() {
347 return Err(WebError::InvalidRequest(
348 "question query parameter is required".to_string(),
349 ));
350 }
351
352 let state = state.read().await;
353
354 let namespace = state
355 .namespace_repo
356 .get_by_name(¶ms.namespace)
357 .await?
358 .ok_or_else(|| WebError::NotFound(format!("Namespace '{}' not found", params.namespace)))?;
359
360 let query_context_limit = nexus_core::Config::from_env()
361 .map(|config| config.agent.query_context_limit)
362 .unwrap_or_else(|_| nexus_core::config::AgentConfig::default().query_context_limit);
363
364 let request = nexus_core::WorkingRepresentationRequest {
365 namespace_id: namespace.id,
366 perspective: None,
367 query: Some(params.question.clone()),
368 max_items: query_context_limit,
369 include_raw: false,
370 ..nexus_core::WorkingRepresentationRequest::default()
371 };
372
373 let introspection =
374 nexus_agent::introspect_query(&request, ¶ms.question, &state.memory_repo)
375 .await
376 .map_err(|e| WebError::Storage(format!("Introspection failed: {}", e)))?;
377
378 Ok(Json(QueryIntrospectionResponse {
379 success: true,
380 namespace: params.namespace,
381 question: params.question,
382 introspection,
383 }))
384}
385
386#[derive(Debug, Deserialize, Default)]
389pub struct DashboardQueryParams {
390 pub namespace: String,
391}
392
393pub async fn dashboard(
395 State(state): State<Arc<RwLock<AppState>>>,
396 Query(params): Query<DashboardQueryParams>,
397) -> Result<Json<DashboardResponse>> {
398 if params.namespace.trim().is_empty() {
399 return Err(WebError::InvalidRequest(
400 "namespace query parameter is required".to_string(),
401 ));
402 }
403
404 let state = state.read().await;
405
406 let namespace = state
407 .namespace_repo
408 .get_by_name(¶ms.namespace)
409 .await?
410 .ok_or_else(|| WebError::NotFound(format!("Namespace '{}' not found", params.namespace)))?;
411
412 let completed_reflections = state
414 .memory_repo
415 .count_jobs(namespace.id, Some("reflect_namespace"), Some("completed"))
416 .await?
417 + state
418 .memory_repo
419 .count_jobs(namespace.id, Some("reflect_perspective"), Some("completed"))
420 .await?;
421 let completed_digests = state
422 .memory_repo
423 .count_jobs(namespace.id, Some("digest_session"), Some("completed"))
424 .await?;
425 let failed_jobs = state
426 .memory_repo
427 .count_jobs(namespace.id, None, Some("failed"))
428 .await?;
429 let pending_jobs = state
430 .memory_repo
431 .count_jobs(namespace.id, None, Some("enqueued"))
432 .await?;
433
434 let last_dream_at = {
436 let reflect_jobs = state
437 .memory_repo
438 .list_jobs(
439 namespace.id,
440 Some("reflect_namespace"),
441 Some("completed"),
442 1,
443 0,
444 )
445 .await
446 .unwrap_or_default();
447 let digest_jobs = state
448 .memory_repo
449 .list_jobs(
450 namespace.id,
451 Some("digest_session"),
452 Some("completed"),
453 1,
454 0,
455 )
456 .await
457 .unwrap_or_default();
458 let most_recent = reflect_jobs
459 .iter()
460 .chain(digest_jobs.iter())
461 .max_by_key(|j| j.updated_at.as_str());
462 most_recent.map(|j| j.updated_at.clone())
463 };
464
465 let total_digests = state.memory_repo.count_digests(namespace.id, None).await?;
467 let sessions_with_cognition = state
468 .memory_repo
469 .count_distinct_session_keys_with_cognition(namespace.id)
470 .await?;
471
472 let (latest_digest_at, latest_digest_age_seconds) = {
473 let recent = state
474 .memory_repo
475 .list_digests(namespace.id, None, 1, 0)
476 .await
477 .unwrap_or_default();
478 match recent.into_iter().next() {
479 Some(d) => {
480 let age = chrono::Utc::now()
481 .signed_duration_since(
482 chrono::DateTime::parse_from_rfc3339(&d.created_at)
483 .unwrap_or_else(|_| chrono::Utc::now().into()),
484 )
485 .num_seconds();
486 (Some(d.created_at), Some(age.max(0)))
487 }
488 None => (None, None),
489 }
490 };
491
492 let raw = state
494 .memory_repo
495 .count_by_cognitive_level(namespace.id, CognitiveLevel::Raw)
496 .await?;
497 let explicit = state
498 .memory_repo
499 .count_by_cognitive_level(namespace.id, CognitiveLevel::Explicit)
500 .await?;
501 let derived = state
502 .memory_repo
503 .count_by_cognitive_level(namespace.id, CognitiveLevel::Derived)
504 .await?;
505 let summary_short = state
506 .memory_repo
507 .count_by_cognitive_level(namespace.id, CognitiveLevel::SummaryShort)
508 .await?;
509 let summary_long = state
510 .memory_repo
511 .count_by_cognitive_level(namespace.id, CognitiveLevel::SummaryLong)
512 .await?;
513 let contradiction = state
514 .memory_repo
515 .count_by_cognitive_level(namespace.id, CognitiveLevel::Contradiction)
516 .await?;
517 let total = raw + explicit + derived + summary_short + summary_long + contradiction;
518
519 let cognition_config = nexus_core::Config::from_env()
521 .map(|c| c.cognition)
522 .unwrap_or_default();
523 let contradiction_density = if total > 0 {
524 contradiction as f64 / total as f64
525 } else {
526 0.0
527 };
528
529 let base_interval = cognition_config.adaptive_dream_min_interval_secs;
530 let factor = 1.0 - ((contradiction as f32 * 0.10).min(0.9));
531 let adapted = (base_interval as f32 * factor) as u64;
532 let current_interval_secs = adapted.clamp(
533 cognition_config.adaptive_dream_min_interval_secs,
534 cognition_config.adaptive_dream_max_interval_secs,
535 );
536
537 Ok(Json(DashboardResponse {
538 success: true,
539 namespace: params.namespace,
540 dream: DreamState {
541 completed_reflections,
542 completed_digests,
543 failed_jobs,
544 pending_jobs,
545 last_dream_at,
546 },
547 digest: DigestFreshnessState {
548 total_digests,
549 sessions_with_cognition,
550 latest_digest_age_seconds,
551 latest_digest_at,
552 },
553 recall: RecallComposition {
554 raw,
555 explicit,
556 derived,
557 summary_short,
558 summary_long,
559 contradiction,
560 total,
561 },
562 adaptive: AdaptiveDreamState {
563 enabled: cognition_config.adaptive_dream_enabled,
564 current_interval_secs,
565 min_interval_secs: cognition_config.adaptive_dream_min_interval_secs,
566 max_interval_secs: cognition_config.adaptive_dream_max_interval_secs,
567 contradiction_count: contradiction,
568 contradiction_density,
569 },
570 }))
571}
572
573#[cfg(test)]
574mod tests {
575 use super::*;
576 use axum::body::Body;
577 use axum::http::{Request, StatusCode};
578 use axum::routing::get;
579 use axum::Router;
580 use nexus_orchestrator::Orchestrator;
581 use serde_json::Value;
582 use std::sync::Arc;
583 use tower::ServiceExt;
584
585 struct TestApp {
586 app: Router,
587 state: Arc<RwLock<crate::state::AppState>>,
588 }
589
590 async fn test_app() -> TestApp {
591 let pool = sqlx::SqlitePool::connect("sqlite::memory:")
592 .await
593 .expect("connect to in-memory db");
594 nexus_storage::migrations::run_migrations(&pool)
595 .await
596 .expect("run migrations");
597
598 let mut storage = nexus_storage::StorageManager::new(pool.clone());
599 storage.initialize().await.expect("initialize storage");
600
601 let orchestrator = Orchestrator::default();
602 let state = Arc::new(RwLock::new(
603 crate::state::AppState::new(storage, orchestrator)
604 .await
605 .expect("create app state"),
606 ));
607
608 let app = Router::new()
609 .route("/api/cognition/jobs", get(list_jobs))
610 .route("/api/cognition/jobs/summary", get(job_summary))
611 .route("/api/cognition/digests", get(list_digests))
612 .route("/api/cognition/overview", get(cognition_overview))
613 .route("/api/cognition/reflection", get(reflection_state))
614 .route("/api/cognition/runtime", get(runtime_health))
615 .route(
616 "/api/cognition/query-introspection",
617 get(query_introspection),
618 )
619 .route("/api/cognition/dashboard", get(dashboard))
620 .with_state(state.clone());
621
622 TestApp { app, state }
623 }
624
625 async fn create_namespace_in_test(state: &Arc<RwLock<crate::state::AppState>>, name: &str) {
627 let s = state.read().await;
628 s.namespace_repo
629 .get_or_create(name, "test-agent")
630 .await
631 .expect("create namespace");
632 }
633
634 fn body_to_json(body: axum::body::Bytes) -> Value {
636 serde_json::from_slice(&body).expect("valid JSON")
637 }
638
639 #[tokio::test]
640 async fn test_runtime_returns_honest_fields() {
641 let test = test_app().await;
642 let resp = test
643 .app
644 .oneshot(
645 Request::builder()
646 .uri("/api/cognition/runtime")
647 .body(Body::empty())
648 .unwrap(),
649 )
650 .await
651 .unwrap();
652
653 assert_eq!(resp.status(), StatusCode::OK);
654 let body = axum::body::to_bytes(resp.into_body(), 1_000_000)
655 .await
656 .unwrap();
657 let json = body_to_json(body);
658
659 assert_eq!(json["success"], true);
660 assert!(!json["version"].as_str().unwrap().is_empty());
661 assert!(json["uptime_seconds"].as_u64().is_some());
662 assert!(json["db_connected"].is_boolean());
663 assert!(json["agent_enabled"].is_boolean());
664 }
665
666 #[tokio::test]
667 async fn test_query_introspection_missing_question_returns_400() {
668 let test = test_app().await;
669 create_namespace_in_test(&test.state, "intro-missing").await;
670
671 let resp = test
672 .app
673 .oneshot(
674 Request::builder()
675 .uri("/api/cognition/query-introspection?namespace=intro-missing")
676 .body(Body::empty())
677 .unwrap(),
678 )
679 .await
680 .unwrap();
681
682 assert_eq!(resp.status(), StatusCode::BAD_REQUEST);
683 }
684
685 #[tokio::test]
686 async fn test_query_introspection_returns_structured_payload() {
687 let test = test_app().await;
688 create_namespace_in_test(&test.state, "intro-ns").await;
689
690 {
691 let state = test.state.read().await;
692 let namespace = state
693 .namespace_repo
694 .get_by_name("intro-ns")
695 .await
696 .unwrap()
697 .unwrap();
698
699 state
700 .memory_repo
701 .store(nexus_storage::StoreMemoryParams {
702 namespace_id: namespace.id,
703 content: "Authentication now uses session cookies with http-only flags.",
704 category: &nexus_core::MemoryCategory::Facts,
705 memory_lane_type: None,
706 labels: &[],
707 metadata: &serde_json::json!({
708 "cognitive": {
709 "level": "explicit",
710 "observer": "claude-code",
711 "subject": "claude-code",
712 "generated_by": "test_fixture",
713 "confidence": 0.92
714 }
715 }),
716 embedding: None,
717 embedding_model: None,
718 })
719 .await
720 .unwrap();
721 }
722
723 let resp = test
724 .app
725 .oneshot(
726 Request::builder()
727 .uri("/api/cognition/query-introspection?namespace=intro-ns&question=session%20cookies")
728 .body(Body::empty())
729 .unwrap(),
730 )
731 .await
732 .unwrap();
733
734 assert_eq!(resp.status(), StatusCode::OK);
735 let body = axum::body::to_bytes(resp.into_body(), 1_000_000)
736 .await
737 .unwrap();
738 let json = body_to_json(body);
739
740 assert_eq!(json["success"], true);
741 assert_eq!(json["namespace"], "intro-ns");
742 assert_eq!(json["question"], "session cookies");
743 assert!(json["introspection"]["included"].is_array());
744 assert!(!json["introspection"]["included"]
745 .as_array()
746 .unwrap()
747 .is_empty());
748 assert!(json["introspection"]["bucket_stats"].is_array());
749 }
750
751 #[tokio::test]
752 async fn test_jobs_missing_namespace_returns_400() {
753 let test = test_app().await;
754 let resp = test
755 .app
756 .oneshot(
757 Request::builder()
758 .uri("/api/cognition/jobs")
759 .body(Body::empty())
760 .unwrap(),
761 )
762 .await
763 .unwrap();
764 assert_eq!(resp.status(), StatusCode::BAD_REQUEST);
765 }
766
767 #[tokio::test]
768 async fn test_jobs_unknown_namespace_returns_404() {
769 let test = test_app().await;
770 let resp = test
771 .app
772 .oneshot(
773 Request::builder()
774 .uri("/api/cognition/jobs?namespace=nonexistent")
775 .body(Body::empty())
776 .unwrap(),
777 )
778 .await
779 .unwrap();
780 assert_eq!(resp.status(), StatusCode::NOT_FOUND);
781 }
782
783 #[tokio::test]
784 async fn test_reflection_missing_namespace_returns_400() {
785 let test = test_app().await;
786 let resp = test
787 .app
788 .oneshot(
789 Request::builder()
790 .uri("/api/cognition/reflection")
791 .body(Body::empty())
792 .unwrap(),
793 )
794 .await
795 .unwrap();
796 assert_eq!(resp.status(), StatusCode::BAD_REQUEST);
797 }
798
799 #[tokio::test]
800 async fn test_reflection_returns_counts_and_samples() {
801 let test = test_app().await;
802 create_namespace_in_test(&test.state, "reflect-ns").await;
803
804 {
805 let state = test.state.read().await;
806 let namespace = state
807 .namespace_repo
808 .get_by_name("reflect-ns")
809 .await
810 .unwrap()
811 .unwrap();
812
813 for (content, level) in [
814 ("derived insight", CognitiveLevel::Derived),
815 ("contradiction note", CognitiveLevel::Contradiction),
816 ] {
817 state
818 .memory_repo
819 .store(nexus_storage::repository::StoreMemoryParams {
820 namespace_id: namespace.id,
821 content,
822 category: &nexus_core::MemoryCategory::Facts,
823 memory_lane_type: None,
824 labels: &[],
825 metadata: &serde_json::json!({
826 "cognitive": {
827 "level": level.as_str(),
828 "observer": "claude-code",
829 "subject": "claude-code",
830 "confidence": 0.9,
831 "generated_by": "test"
832 }
833 }),
834 embedding: None,
835 embedding_model: None,
836 })
837 .await
838 .unwrap();
839 }
840 }
841
842 let resp = test
843 .app
844 .oneshot(
845 Request::builder()
846 .uri("/api/cognition/reflection?namespace=reflect-ns")
847 .body(Body::empty())
848 .unwrap(),
849 )
850 .await
851 .unwrap();
852
853 assert_eq!(resp.status(), StatusCode::OK);
854 let body = axum::body::to_bytes(resp.into_body(), 1_000_000)
855 .await
856 .unwrap();
857 let json = body_to_json(body);
858
859 assert_eq!(json["success"], true);
860 assert_eq!(json["derived_count"], 1);
861 assert_eq!(json["contradiction_count"], 1);
862 assert_eq!(json["recent_derived"][0]["content"], "derived insight");
863 assert_eq!(
864 json["recent_contradictions"][0]["content"],
865 "contradiction note"
866 );
867 }
868
869 #[tokio::test]
870 async fn test_overview_returns_latest_stage_metrics() {
871 let test = test_app().await;
872 create_namespace_in_test(&test.state, "overview-ns").await;
873
874 {
875 let state = test.state.read().await;
876 let namespace = state
877 .namespace_repo
878 .get_by_name("overview-ns")
879 .await
880 .unwrap()
881 .unwrap();
882
883 state
884 .memory_repo
885 .record_metric(
886 "cognition.query.total_ms",
887 11.0,
888 &serde_json::json!({"namespace_id": namespace.id, "stage": "total", "unit": "ms"}),
889 )
890 .await
891 .unwrap();
892 state
893 .memory_repo
894 .record_metric(
895 "cognition.query.total_ms",
896 15.5,
897 &serde_json::json!({"namespace_id": namespace.id, "stage": "total", "unit": "ms"}),
898 )
899 .await
900 .unwrap();
901 state
902 .memory_repo
903 .record_metric(
904 "cognition.dream.total_ms",
905 44.0,
906 &serde_json::json!({"namespace_id": namespace.id, "stage": "total", "unit": "ms"}),
907 )
908 .await
909 .unwrap();
910 }
911
912 let resp = test
913 .app
914 .oneshot(
915 Request::builder()
916 .uri("/api/cognition/overview?namespace=overview-ns")
917 .body(Body::empty())
918 .unwrap(),
919 )
920 .await
921 .unwrap();
922
923 assert_eq!(resp.status(), StatusCode::OK);
924 let body = axum::body::to_bytes(resp.into_body(), 1_000_000)
925 .await
926 .unwrap();
927 let json = body_to_json(body);
928
929 assert_eq!(json["success"], true);
930 assert_eq!(json["stage_metrics"]["cognition.query.total_ms"], 15.5);
931 assert_eq!(json["stage_metrics"]["cognition.dream.total_ms"], 44.0);
932 }
933
934 #[tokio::test]
935 async fn test_digests_missing_namespace_returns_400() {
936 let test = test_app().await;
937 let resp = test
938 .app
939 .oneshot(
940 Request::builder()
941 .uri("/api/cognition/digests")
942 .body(Body::empty())
943 .unwrap(),
944 )
945 .await
946 .unwrap();
947 assert_eq!(resp.status(), StatusCode::BAD_REQUEST);
948 }
949
950 #[tokio::test]
951 async fn test_overview_unknown_namespace_returns_404() {
952 let test = test_app().await;
953 let resp = test
954 .app
955 .oneshot(
956 Request::builder()
957 .uri("/api/cognition/overview?namespace=nope")
958 .body(Body::empty())
959 .unwrap(),
960 )
961 .await
962 .unwrap();
963 assert_eq!(resp.status(), StatusCode::NOT_FOUND);
964 }
965
966 #[tokio::test]
967 async fn test_jobs_with_existing_namespace_returns_empty_list() {
968 let test = test_app().await;
969 create_namespace_in_test(&test.state, "jobs-test-ns").await;
970
971 let resp = test
972 .app
973 .oneshot(
974 Request::builder()
975 .uri("/api/cognition/jobs?namespace=jobs-test-ns")
976 .body(Body::empty())
977 .unwrap(),
978 )
979 .await
980 .unwrap();
981
982 assert_eq!(resp.status(), StatusCode::OK);
983 let body = axum::body::to_bytes(resp.into_body(), 1_000_000)
984 .await
985 .unwrap();
986 let json = body_to_json(body);
987
988 assert_eq!(json["success"], true);
989 assert_eq!(json["namespace"], "jobs-test-ns");
990 assert_eq!(json["jobs"], Value::Array(vec![]));
991 assert_eq!(json["total"], 0);
992 }
993
994 #[tokio::test]
995 async fn test_jobs_reports_total_matching_rows_not_page_len() {
996 let test = test_app().await;
997 create_namespace_in_test(&test.state, "jobs-page-ns").await;
998
999 {
1000 let state = test.state.read().await;
1001 let namespace = state
1002 .namespace_repo
1003 .get_by_name("jobs-page-ns")
1004 .await
1005 .unwrap()
1006 .unwrap();
1007
1008 for idx in 0..3 {
1009 state
1010 .memory_repo
1011 .enqueue_job(nexus_storage::EnqueueJobParams {
1012 namespace_id: namespace.id,
1013 job_type: "derive",
1014 priority: 10 - idx,
1015 perspective: None,
1016 payload: &serde_json::json!({ "idx": idx }),
1017 })
1018 .await
1019 .unwrap();
1020 }
1021 }
1022
1023 let resp = test
1024 .app
1025 .oneshot(
1026 Request::builder()
1027 .uri("/api/cognition/jobs?namespace=jobs-page-ns&limit=2")
1028 .body(Body::empty())
1029 .unwrap(),
1030 )
1031 .await
1032 .unwrap();
1033
1034 assert_eq!(resp.status(), StatusCode::OK);
1035 let body = axum::body::to_bytes(resp.into_body(), 1_000_000)
1036 .await
1037 .unwrap();
1038 let json = body_to_json(body);
1039
1040 assert_eq!(json["success"], true);
1041 assert_eq!(json["jobs"].as_array().unwrap().len(), 2);
1042 assert_eq!(json["total"], 3);
1043 }
1044
1045 #[tokio::test]
1046 async fn test_overview_with_existing_namespace() {
1047 let test = test_app().await;
1048 create_namespace_in_test(&test.state, "overview-test-ns").await;
1049
1050 let resp = test
1051 .app
1052 .oneshot(
1053 Request::builder()
1054 .uri("/api/cognition/overview?namespace=overview-test-ns")
1055 .body(Body::empty())
1056 .unwrap(),
1057 )
1058 .await
1059 .unwrap();
1060
1061 assert_eq!(resp.status(), StatusCode::OK);
1062 let body = axum::body::to_bytes(resp.into_body(), 1_000_000)
1063 .await
1064 .unwrap();
1065 let json = body_to_json(body);
1066
1067 assert_eq!(json["success"], true);
1068 assert_eq!(json["namespace"], "overview-test-ns");
1069 assert_eq!(json["digest_count"], 0);
1070 assert_eq!(json["evidence_count"], 0);
1071 }
1072
1073 #[tokio::test]
1074 async fn test_job_summary_with_existing_namespace() {
1075 let test = test_app().await;
1076 create_namespace_in_test(&test.state, "summary-test-ns").await;
1077
1078 let resp = test
1079 .app
1080 .oneshot(
1081 Request::builder()
1082 .uri("/api/cognition/jobs/summary?namespace=summary-test-ns")
1083 .body(Body::empty())
1084 .unwrap(),
1085 )
1086 .await
1087 .unwrap();
1088
1089 assert_eq!(resp.status(), StatusCode::OK);
1090 let body = axum::body::to_bytes(resp.into_body(), 1_000_000)
1091 .await
1092 .unwrap();
1093 let json = body_to_json(body);
1094
1095 assert_eq!(json["success"], true);
1096 assert_eq!(json["namespace"], "summary-test-ns");
1097 assert!(json["counts"].is_object());
1098 }
1099
1100 #[tokio::test]
1101 async fn test_job_summary_returns_real_counts() {
1102 let test = test_app().await;
1103 create_namespace_in_test(&test.state, "summary-data-ns").await;
1104
1105 {
1106 let state = test.state.read().await;
1107 let namespace = state
1108 .namespace_repo
1109 .get_by_name("summary-data-ns")
1110 .await
1111 .unwrap()
1112 .unwrap();
1113
1114 state
1115 .memory_repo
1116 .enqueue_job(nexus_storage::EnqueueJobParams {
1117 namespace_id: namespace.id,
1118 job_type: "derive",
1119 priority: 10,
1120 perspective: None,
1121 payload: &serde_json::json!({ "idx": 1 }),
1122 })
1123 .await
1124 .unwrap();
1125 state
1126 .memory_repo
1127 .enqueue_job(nexus_storage::EnqueueJobParams {
1128 namespace_id: namespace.id,
1129 job_type: "digest",
1130 priority: 5,
1131 perspective: None,
1132 payload: &serde_json::json!({ "idx": 2 }),
1133 })
1134 .await
1135 .unwrap();
1136 }
1137
1138 let resp = test
1139 .app
1140 .oneshot(
1141 Request::builder()
1142 .uri("/api/cognition/jobs/summary?namespace=summary-data-ns")
1143 .body(Body::empty())
1144 .unwrap(),
1145 )
1146 .await
1147 .unwrap();
1148
1149 assert_eq!(resp.status(), StatusCode::OK);
1150 let body = axum::body::to_bytes(resp.into_body(), 1_000_000)
1151 .await
1152 .unwrap();
1153 let json = body_to_json(body);
1154
1155 assert_eq!(json["success"], true);
1156 assert_eq!(json["counts"]["pending"], 2);
1157 }
1158
1159 #[tokio::test]
1160 async fn test_digests_with_existing_namespace() {
1161 let test = test_app().await;
1162 create_namespace_in_test(&test.state, "digest-test-ns").await;
1163
1164 let resp = test
1165 .app
1166 .oneshot(
1167 Request::builder()
1168 .uri("/api/cognition/digests?namespace=digest-test-ns")
1169 .body(Body::empty())
1170 .unwrap(),
1171 )
1172 .await
1173 .unwrap();
1174
1175 assert_eq!(resp.status(), StatusCode::OK);
1176 let body = axum::body::to_bytes(resp.into_body(), 1_000_000)
1177 .await
1178 .unwrap();
1179 let json = body_to_json(body);
1180
1181 assert_eq!(json["success"], true);
1182 assert_eq!(json["namespace"], "digest-test-ns");
1183 assert_eq!(json["total"], 0);
1184 }
1185
1186 #[tokio::test]
1187 async fn test_digests_support_pagination_and_total() {
1188 let test = test_app().await;
1189 create_namespace_in_test(&test.state, "digest-page-ns").await;
1190
1191 {
1192 let state = test.state.read().await;
1193 let namespace = state
1194 .namespace_repo
1195 .get_by_name("digest-page-ns")
1196 .await
1197 .unwrap()
1198 .unwrap();
1199
1200 for idx in 0..3 {
1201 let content = format!("digest memory {idx}");
1202 let memory = state
1203 .memory_repo
1204 .store(nexus_storage::StoreMemoryParams {
1205 namespace_id: namespace.id,
1206 content: &content,
1207 category: &nexus_core::MemoryCategory::Session,
1208 memory_lane_type: None,
1209 labels: &[],
1210 metadata: &serde_json::json!({}),
1211 embedding: None,
1212 embedding_model: None,
1213 })
1214 .await
1215 .unwrap();
1216
1217 state
1218 .memory_repo
1219 .store_digest(nexus_storage::StoreDigestParams {
1220 namespace_id: namespace.id,
1221 session_key: "digest-session",
1222 digest_kind: if idx % 2 == 0 {
1223 "summary_short"
1224 } else {
1225 "summary_long"
1226 },
1227 memory_id: memory.id,
1228 start_memory_id: Some(memory.id),
1229 end_memory_id: Some(memory.id),
1230 token_count: 100 + idx,
1231 })
1232 .await
1233 .unwrap();
1234 }
1235 }
1236
1237 let resp = test
1238 .app
1239 .oneshot(
1240 Request::builder()
1241 .uri("/api/cognition/digests?namespace=digest-page-ns&limit=2")
1242 .body(Body::empty())
1243 .unwrap(),
1244 )
1245 .await
1246 .unwrap();
1247
1248 assert_eq!(resp.status(), StatusCode::OK);
1249 let body = axum::body::to_bytes(resp.into_body(), 1_000_000)
1250 .await
1251 .unwrap();
1252 let json = body_to_json(body);
1253
1254 assert_eq!(json["success"], true);
1255 assert_eq!(json["digests"].as_array().unwrap().len(), 2);
1256 assert_eq!(json["total"], 3);
1257 }
1258
1259 #[test]
1260 fn test_job_entry_serialization() {
1261 let job = JobEntry {
1262 id: 1,
1263 job_type: "derive".to_string(),
1264 status: "pending".to_string(),
1265 priority: 10,
1266 attempts: 0,
1267 last_error: None,
1268 lease_owner: Some("worker-1".to_string()),
1269 lease_expires_at: None,
1270 created_at: "2026-01-01T00:00:00Z".to_string(),
1271 updated_at: "2026-01-01T00:00:00Z".to_string(),
1272 };
1273 let json = serde_json::to_value(&job).unwrap();
1274 assert_eq!(json["id"], 1);
1275 assert_eq!(json["job_type"], "derive");
1276 assert_eq!(json["lease_owner"], "worker-1");
1277 assert!(json["last_error"].is_null());
1278 }
1279
1280 #[test]
1281 fn test_digest_entry_serialization() {
1282 let digest = DigestEntry {
1283 id: 1,
1284 session_key: "sess-1".to_string(),
1285 digest_kind: "summary_short".to_string(),
1286 memory_id: 42,
1287 start_memory_id: Some(1),
1288 end_memory_id: Some(10),
1289 token_count: 200,
1290 created_at: "2026-01-01T00:00:00Z".to_string(),
1291 };
1292 let json = serde_json::to_value(&digest).unwrap();
1293 assert_eq!(json["session_key"], "sess-1");
1294 assert_eq!(json["memory_id"], 42);
1295 assert_eq!(json["token_count"], 200);
1296 }
1297
1298 #[tokio::test]
1299 async fn test_overview_with_enqueued_job_and_evidence() {
1300 let test = test_app().await;
1301 create_namespace_in_test(&test.state, "data-test-ns").await;
1302
1303 {
1304 let s = test.state.read().await;
1305 let ns = s
1306 .namespace_repo
1307 .get_by_name("data-test-ns")
1308 .await
1309 .unwrap()
1310 .expect("namespace exists");
1311
1312 let mem_id = s
1313 .memory_repo
1314 .store(nexus_storage::StoreMemoryParams {
1315 namespace_id: ns.id,
1316 content: "test memory for evidence",
1317 category: &nexus_core::MemoryCategory::Session,
1318 memory_lane_type: None,
1319 labels: &[],
1320 metadata: &serde_json::json!({}),
1321 embedding: None,
1322 embedding_model: None,
1323 })
1324 .await
1325 .unwrap();
1326
1327 s.memory_repo
1328 .enqueue_job(nexus_storage::EnqueueJobParams {
1329 namespace_id: ns.id,
1330 job_type: "derive",
1331 priority: 5,
1332 perspective: None,
1333 payload: &serde_json::json!({"test": true}),
1334 })
1335 .await
1336 .unwrap();
1337
1338 s.memory_repo
1339 .store_with_lineage(nexus_storage::StoreMemoryWithLineageParams {
1340 store: nexus_storage::StoreMemoryParams {
1341 namespace_id: ns.id,
1342 content: "derived from evidence",
1343 category: &nexus_core::MemoryCategory::Facts,
1344 memory_lane_type: None,
1345 labels: &[],
1346 metadata: &serde_json::json!({"cognitive": {"level": "derived"}}),
1347 embedding: None,
1348 embedding_model: None,
1349 },
1350 source_memory_ids: &[mem_id.id],
1351 evidence_role: "source",
1352 })
1353 .await
1354 .unwrap();
1355
1356 let _ = mem_id;
1357 }
1358
1359 let resp = test
1360 .app
1361 .oneshot(
1362 Request::builder()
1363 .uri("/api/cognition/overview?namespace=data-test-ns")
1364 .body(Body::empty())
1365 .unwrap(),
1366 )
1367 .await
1368 .unwrap();
1369
1370 assert_eq!(resp.status(), StatusCode::OK);
1371 let body = axum::body::to_bytes(resp.into_body(), 1_000_000)
1372 .await
1373 .unwrap();
1374 let json = body_to_json(body);
1375
1376 assert_eq!(json["success"], true);
1377 assert_eq!(json["jobs_by_status"]["pending"], 1);
1378 assert!(json["evidence_count"].as_i64().unwrap() >= 1);
1379 }
1380
1381 #[tokio::test]
1384 async fn test_dashboard_missing_namespace_returns_400() {
1385 let test = test_app().await;
1386 let resp = test
1387 .app
1388 .oneshot(
1389 Request::builder()
1390 .uri("/api/cognition/dashboard")
1391 .body(Body::empty())
1392 .unwrap(),
1393 )
1394 .await
1395 .unwrap();
1396 assert_eq!(resp.status(), StatusCode::BAD_REQUEST);
1397 }
1398
1399 #[tokio::test]
1400 async fn test_dashboard_unknown_namespace_returns_404() {
1401 let test = test_app().await;
1402 let resp = test
1403 .app
1404 .oneshot(
1405 Request::builder()
1406 .uri("/api/cognition/dashboard?namespace=nonexistent")
1407 .body(Body::empty())
1408 .unwrap(),
1409 )
1410 .await
1411 .unwrap();
1412 assert_eq!(resp.status(), StatusCode::NOT_FOUND);
1413 }
1414
1415 #[tokio::test]
1416 async fn test_dashboard_returns_all_sections() {
1417 let test = test_app().await;
1418 create_namespace_in_test(&test.state, "dash-empty-ns").await;
1419
1420 let resp = test
1421 .app
1422 .oneshot(
1423 Request::builder()
1424 .uri("/api/cognition/dashboard?namespace=dash-empty-ns")
1425 .body(Body::empty())
1426 .unwrap(),
1427 )
1428 .await
1429 .unwrap();
1430
1431 assert_eq!(resp.status(), StatusCode::OK);
1432 let body = axum::body::to_bytes(resp.into_body(), 1_000_000)
1433 .await
1434 .unwrap();
1435 let json = body_to_json(body);
1436
1437 assert_eq!(json["success"], true);
1438 assert_eq!(json["namespace"], "dash-empty-ns");
1439
1440 assert_eq!(json["dream"]["completed_reflections"], 0);
1442 assert_eq!(json["dream"]["completed_digests"], 0);
1443 assert_eq!(json["dream"]["failed_jobs"], 0);
1444 assert_eq!(json["dream"]["pending_jobs"], 0);
1445 assert!(json["dream"]["last_dream_at"].is_null());
1446
1447 assert_eq!(json["digest"]["total_digests"], 0);
1449 assert_eq!(json["digest"]["sessions_with_cognition"], 0);
1450 assert!(json["digest"]["latest_digest_at"].is_null());
1451 assert!(json["digest"]["latest_digest_age_seconds"].is_null());
1452
1453 assert_eq!(json["recall"]["raw"], 0);
1455 assert_eq!(json["recall"]["explicit"], 0);
1456 assert_eq!(json["recall"]["contradiction"], 0);
1457 assert_eq!(json["recall"]["total"], 0);
1458
1459 assert!(json["adaptive"]["enabled"].is_boolean());
1461 assert!(json["adaptive"]["current_interval_secs"].as_u64().is_some());
1462 assert!(json["adaptive"]["contradiction_density"].is_number());
1463 }
1464
1465 #[tokio::test]
1466 async fn test_dashboard_populates_recall_and_dream_from_data() {
1467 let test = test_app().await;
1468 create_namespace_in_test(&test.state, "dash-data-ns").await;
1469
1470 {
1471 let state = test.state.read().await;
1472 let namespace = state
1473 .namespace_repo
1474 .get_by_name("dash-data-ns")
1475 .await
1476 .unwrap()
1477 .unwrap();
1478
1479 for (content, level) in [
1481 ("raw event", CognitiveLevel::Raw),
1482 ("explicit fact", CognitiveLevel::Explicit),
1483 ("derived insight", CognitiveLevel::Derived),
1484 ("contradiction note", CognitiveLevel::Contradiction),
1485 ] {
1486 state
1487 .memory_repo
1488 .store(nexus_storage::repository::StoreMemoryParams {
1489 namespace_id: namespace.id,
1490 content,
1491 category: &nexus_core::MemoryCategory::Facts,
1492 memory_lane_type: None,
1493 labels: &[],
1494 metadata: &serde_json::json!({
1495 "cognitive": {
1496 "level": level.as_str(),
1497 "observer": "claude-code",
1498 "subject": "claude-code",
1499 "confidence": 0.9,
1500 "generated_by": "test"
1501 }
1502 }),
1503 embedding: None,
1504 embedding_model: None,
1505 })
1506 .await
1507 .unwrap();
1508 }
1509 }
1510
1511 let resp = test
1512 .app
1513 .oneshot(
1514 Request::builder()
1515 .uri("/api/cognition/dashboard?namespace=dash-data-ns")
1516 .body(Body::empty())
1517 .unwrap(),
1518 )
1519 .await
1520 .unwrap();
1521
1522 assert_eq!(resp.status(), StatusCode::OK);
1523 let body = axum::body::to_bytes(resp.into_body(), 1_000_000)
1524 .await
1525 .unwrap();
1526 let json = body_to_json(body);
1527
1528 assert_eq!(json["recall"]["raw"], 1);
1529 assert_eq!(json["recall"]["explicit"], 1);
1530 assert_eq!(json["recall"]["derived"], 1);
1531 assert_eq!(json["recall"]["contradiction"], 1);
1532 assert_eq!(json["recall"]["total"], 4);
1533 assert!((json["adaptive"]["contradiction_density"].as_f64().unwrap() - 0.25).abs() < 0.01);
1535 }
1536
1537 #[test]
1538 fn test_dashboard_response_serialization_roundtrip() {
1539 let dash = DashboardResponse {
1540 success: true,
1541 namespace: "test".to_string(),
1542 dream: DreamState {
1543 completed_reflections: 5,
1544 completed_digests: 3,
1545 failed_jobs: 1,
1546 pending_jobs: 2,
1547 last_dream_at: Some("2026-03-27T12:00:00Z".to_string()),
1548 },
1549 digest: DigestFreshnessState {
1550 total_digests: 10,
1551 sessions_with_cognition: 4,
1552 latest_digest_age_seconds: Some(3600),
1553 latest_digest_at: Some("2026-03-27T11:00:00Z".to_string()),
1554 },
1555 recall: RecallComposition {
1556 raw: 50,
1557 explicit: 30,
1558 derived: 10,
1559 summary_short: 5,
1560 summary_long: 3,
1561 contradiction: 2,
1562 total: 100,
1563 },
1564 adaptive: AdaptiveDreamState {
1565 enabled: true,
1566 current_interval_secs: 120,
1567 min_interval_secs: 60,
1568 max_interval_secs: 600,
1569 contradiction_count: 2,
1570 contradiction_density: 0.02,
1571 },
1572 };
1573 let json = serde_json::to_value(&dash).unwrap();
1574 assert_eq!(json["success"], true);
1575 assert_eq!(json["dream"]["completed_reflections"], 5);
1576 assert_eq!(json["recall"]["total"], 100);
1577 assert_eq!(json["adaptive"]["enabled"], true);
1578 assert_eq!(json["adaptive"]["current_interval_secs"], 120);
1579
1580 let deserialized: DashboardResponse = serde_json::from_value(json).unwrap();
1582 assert_eq!(deserialized.dream.completed_reflections, 5);
1583 assert_eq!(deserialized.recall.total, 100);
1584 }
1585}