1use crate::error::{CrvError, CrvResult};
14use crate::stage_i::StageIEncoder;
15use crate::stage_ii::StageIIEncoder;
16use crate::stage_iii::StageIIIEncoder;
17use crate::stage_iv::StageIVEncoder;
18use crate::stage_v::StageVEngine;
19use crate::stage_vi::StageVIModeler;
20use crate::types::*;
21use ruvector_gnn::search::cosine_similarity;
22use std::collections::HashMap;
23
24#[derive(Debug, Clone)]
26struct SessionEntry {
27 embedding: Vec<f32>,
29 stage: u8,
31 entry_index: usize,
33 metadata: HashMap<String, serde_json::Value>,
35 timestamp_ms: u64,
37}
38
39#[derive(Debug)]
41struct Session {
42 id: SessionId,
44 coordinate: TargetCoordinate,
46 entries: Vec<SessionEntry>,
48}
49
50#[derive(Debug)]
52pub struct CrvSessionManager {
53 config: CrvConfig,
55 stage_i: StageIEncoder,
57 stage_ii: StageIIEncoder,
59 stage_iii: StageIIIEncoder,
61 stage_iv: StageIVEncoder,
63 stage_v: StageVEngine,
65 stage_vi: StageVIModeler,
67 sessions: HashMap<SessionId, Session>,
69}
70
71impl CrvSessionManager {
72 pub fn new(config: CrvConfig) -> Self {
74 let stage_i = StageIEncoder::new(&config);
75 let stage_ii = StageIIEncoder::new(&config);
76 let stage_iii = StageIIIEncoder::new(&config);
77 let stage_iv = StageIVEncoder::new(&config);
78 let stage_v = StageVEngine::new(&config);
79 let stage_vi = StageVIModeler::new(&config);
80
81 Self {
82 config,
83 stage_i,
84 stage_ii,
85 stage_iii,
86 stage_iv,
87 stage_v,
88 stage_vi,
89 sessions: HashMap::new(),
90 }
91 }
92
93 pub fn create_session(
95 &mut self,
96 session_id: SessionId,
97 coordinate: TargetCoordinate,
98 ) -> CrvResult<()> {
99 if self.sessions.contains_key(&session_id) {
100 return Err(CrvError::EncodingError(format!(
101 "Session {} already exists",
102 session_id
103 )));
104 }
105
106 self.sessions.insert(
107 session_id.clone(),
108 Session {
109 id: session_id,
110 coordinate,
111 entries: Vec::new(),
112 },
113 );
114
115 Ok(())
116 }
117
118 pub fn add_stage_i(
120 &mut self,
121 session_id: &str,
122 data: &StageIData,
123 ) -> CrvResult<Vec<f32>> {
124 let embedding = self.stage_i.encode(data)?;
125 self.add_entry(session_id, 1, embedding.clone(), HashMap::new())?;
126 Ok(embedding)
127 }
128
129 pub fn add_stage_ii(
131 &mut self,
132 session_id: &str,
133 data: &StageIIData,
134 ) -> CrvResult<Vec<f32>> {
135 let embedding = self.stage_ii.encode(data)?;
136 self.add_entry(session_id, 2, embedding.clone(), HashMap::new())?;
137 Ok(embedding)
138 }
139
140 pub fn add_stage_iii(
142 &mut self,
143 session_id: &str,
144 data: &StageIIIData,
145 ) -> CrvResult<Vec<f32>> {
146 let embedding = self.stage_iii.encode(data)?;
147 self.add_entry(session_id, 3, embedding.clone(), HashMap::new())?;
148 Ok(embedding)
149 }
150
151 pub fn add_stage_iv(
153 &mut self,
154 session_id: &str,
155 data: &StageIVData,
156 ) -> CrvResult<Vec<f32>> {
157 let embedding = self.stage_iv.encode(data)?;
158 self.add_entry(session_id, 4, embedding.clone(), HashMap::new())?;
159 Ok(embedding)
160 }
161
162 pub fn run_stage_v(
166 &mut self,
167 session_id: &str,
168 probe_queries: &[(&str, u8, Vec<f32>)], k: usize,
170 ) -> CrvResult<StageVData> {
171 let session = self
172 .sessions
173 .get(session_id)
174 .ok_or_else(|| CrvError::SessionNotFound(session_id.to_string()))?;
175
176 let all_embeddings: Vec<Vec<f32>> =
177 session.entries.iter().map(|e| e.embedding.clone()).collect();
178
179 let mut probes = Vec::new();
180 let mut cross_refs = Vec::new();
181
182 for (query_text, target_stage, query_emb) in probe_queries {
183 let stage_entries: Vec<Vec<f32>> = session
185 .entries
186 .iter()
187 .filter(|e| e.stage == *target_stage)
188 .map(|e| e.embedding.clone())
189 .collect();
190
191 if stage_entries.is_empty() {
192 continue;
193 }
194
195 let mut probe = self.stage_v.probe(query_emb, &stage_entries, k)?;
196 probe.query = query_text.to_string();
197 probe.target_stage = *target_stage;
198 probes.push(probe);
199 }
200
201 for from_stage in 1..=4u8 {
203 for to_stage in (from_stage + 1)..=4u8 {
204 let from_entries: Vec<Vec<f32>> = session
205 .entries
206 .iter()
207 .filter(|e| e.stage == from_stage)
208 .map(|e| e.embedding.clone())
209 .collect();
210 let to_entries: Vec<Vec<f32>> = session
211 .entries
212 .iter()
213 .filter(|e| e.stage == to_stage)
214 .map(|e| e.embedding.clone())
215 .collect();
216
217 if !from_entries.is_empty() && !to_entries.is_empty() {
218 let refs = self.stage_v.cross_reference(
219 from_stage,
220 &from_entries,
221 to_stage,
222 &to_entries,
223 self.config.convergence_threshold,
224 );
225 cross_refs.extend(refs);
226 }
227 }
228 }
229
230 let stage_v_data = StageVData {
231 probes,
232 cross_references: cross_refs,
233 };
234
235 if !stage_v_data.probes.is_empty() {
237 let embedding = self.stage_v.encode(&stage_v_data, &all_embeddings)?;
238 self.add_entry(session_id, 5, embedding, HashMap::new())?;
239 }
240
241 Ok(stage_v_data)
242 }
243
244 pub fn run_stage_vi(&mut self, session_id: &str) -> CrvResult<StageVIData> {
246 let session = self
247 .sessions
248 .get(session_id)
249 .ok_or_else(|| CrvError::SessionNotFound(session_id.to_string()))?;
250
251 let embeddings: Vec<Vec<f32>> =
252 session.entries.iter().map(|e| e.embedding.clone()).collect();
253 let labels: Vec<(u8, usize)> = session
254 .entries
255 .iter()
256 .map(|e| (e.stage, e.entry_index))
257 .collect();
258
259 let stage_vi_data = self.stage_vi.partition(&embeddings, &labels)?;
260
261 let embedding = self.stage_vi.encode(&stage_vi_data)?;
263 self.add_entry(session_id, 6, embedding, HashMap::new())?;
264
265 Ok(stage_vi_data)
266 }
267
268 pub fn find_convergence(
274 &self,
275 coordinate: &str,
276 min_similarity: f32,
277 ) -> CrvResult<ConvergenceResult> {
278 let relevant_sessions: Vec<&Session> = self
280 .sessions
281 .values()
282 .filter(|s| s.coordinate == coordinate)
283 .collect();
284
285 if relevant_sessions.len() < 2 {
286 return Err(CrvError::EmptyInput(
287 "Need at least 2 sessions for convergence analysis".to_string(),
288 ));
289 }
290
291 let mut session_pairs = Vec::new();
292 let mut scores = Vec::new();
293 let mut convergent_stages = Vec::new();
294
295 for i in 0..relevant_sessions.len() {
297 for j in (i + 1)..relevant_sessions.len() {
298 let sess_a = relevant_sessions[i];
299 let sess_b = relevant_sessions[j];
300
301 for stage in 1..=6u8 {
303 let entries_a: Vec<&[f32]> = sess_a
304 .entries
305 .iter()
306 .filter(|e| e.stage == stage)
307 .map(|e| e.embedding.as_slice())
308 .collect();
309 let entries_b: Vec<&[f32]> = sess_b
310 .entries
311 .iter()
312 .filter(|e| e.stage == stage)
313 .map(|e| e.embedding.as_slice())
314 .collect();
315
316 if entries_a.is_empty() || entries_b.is_empty() {
317 continue;
318 }
319
320 for emb_a in &entries_a {
322 for emb_b in &entries_b {
323 if emb_a.len() == emb_b.len() && !emb_a.is_empty() {
324 let sim = cosine_similarity(emb_a, emb_b);
325 if sim >= min_similarity {
326 session_pairs
327 .push((sess_a.id.clone(), sess_b.id.clone()));
328 scores.push(sim);
329 if !convergent_stages.contains(&stage) {
330 convergent_stages.push(stage);
331 }
332 }
333 }
334 }
335 }
336 }
337 }
338 }
339
340 let consensus_embedding = if !scores.is_empty() {
342 let mut consensus = vec![0.0f32; self.config.dimensions];
343 let mut count = 0usize;
344
345 for session in &relevant_sessions {
346 for entry in &session.entries {
347 if convergent_stages.contains(&entry.stage) {
348 for (i, &v) in entry.embedding.iter().enumerate() {
349 if i < self.config.dimensions {
350 consensus[i] += v;
351 }
352 }
353 count += 1;
354 }
355 }
356 }
357
358 if count > 0 {
359 for v in &mut consensus {
360 *v /= count as f32;
361 }
362 Some(consensus)
363 } else {
364 None
365 }
366 } else {
367 None
368 };
369
370 convergent_stages.sort();
372
373 Ok(ConvergenceResult {
374 session_pairs,
375 scores,
376 convergent_stages,
377 consensus_embedding,
378 })
379 }
380
381 pub fn get_session_embeddings(&self, session_id: &str) -> CrvResult<Vec<CrvSessionEntry>> {
383 let session = self
384 .sessions
385 .get(session_id)
386 .ok_or_else(|| CrvError::SessionNotFound(session_id.to_string()))?;
387
388 Ok(session
389 .entries
390 .iter()
391 .map(|e| CrvSessionEntry {
392 session_id: session.id.clone(),
393 coordinate: session.coordinate.clone(),
394 stage: e.stage,
395 embedding: e.embedding.clone(),
396 metadata: e.metadata.clone(),
397 timestamp_ms: e.timestamp_ms,
398 })
399 .collect())
400 }
401
402 pub fn session_entry_count(&self, session_id: &str) -> usize {
404 self.sessions
405 .get(session_id)
406 .map(|s| s.entries.len())
407 .unwrap_or(0)
408 }
409
410 pub fn session_count(&self) -> usize {
412 self.sessions.len()
413 }
414
415 pub fn remove_session(&mut self, session_id: &str) -> bool {
417 self.sessions.remove(session_id).is_some()
418 }
419
420 pub fn stage_i_encoder(&self) -> &StageIEncoder {
422 &self.stage_i
423 }
424
425 pub fn stage_ii_encoder(&self) -> &StageIIEncoder {
427 &self.stage_ii
428 }
429
430 pub fn stage_iv_encoder(&self) -> &StageIVEncoder {
432 &self.stage_iv
433 }
434
435 pub fn stage_v_engine(&self) -> &StageVEngine {
437 &self.stage_v
438 }
439
440 pub fn stage_vi_modeler(&self) -> &StageVIModeler {
442 &self.stage_vi
443 }
444
445 fn add_entry(
447 &mut self,
448 session_id: &str,
449 stage: u8,
450 embedding: Vec<f32>,
451 metadata: HashMap<String, serde_json::Value>,
452 ) -> CrvResult<()> {
453 let session = self
454 .sessions
455 .get_mut(session_id)
456 .ok_or_else(|| CrvError::SessionNotFound(session_id.to_string()))?;
457
458 let entry_index = session.entries.iter().filter(|e| e.stage == stage).count();
459
460 session.entries.push(SessionEntry {
461 embedding,
462 stage,
463 entry_index,
464 metadata,
465 timestamp_ms: 0,
466 });
467
468 Ok(())
469 }
470}
471
472#[cfg(test)]
473mod tests {
474 use super::*;
475
476 fn test_config() -> CrvConfig {
477 CrvConfig {
478 dimensions: 32,
479 convergence_threshold: 0.5,
480 ..CrvConfig::default()
481 }
482 }
483
484 #[test]
485 fn test_session_creation() {
486 let config = test_config();
487 let mut manager = CrvSessionManager::new(config);
488
489 manager
490 .create_session("sess-1".to_string(), "1234-5678".to_string())
491 .unwrap();
492 assert_eq!(manager.session_count(), 1);
493 assert_eq!(manager.session_entry_count("sess-1"), 0);
494 }
495
496 #[test]
497 fn test_add_stage_i() {
498 let config = test_config();
499 let mut manager = CrvSessionManager::new(config);
500
501 manager
502 .create_session("sess-1".to_string(), "1234-5678".to_string())
503 .unwrap();
504
505 let data = StageIData {
506 stroke: vec![(0.0, 0.0), (1.0, 1.0), (2.0, 0.0)],
507 spontaneous_descriptor: "angular".to_string(),
508 classification: GestaltType::Manmade,
509 confidence: 0.9,
510 };
511
512 let emb = manager.add_stage_i("sess-1", &data).unwrap();
513 assert_eq!(emb.len(), 32);
514 assert_eq!(manager.session_entry_count("sess-1"), 1);
515 }
516
517 #[test]
518 fn test_add_stage_ii() {
519 let config = test_config();
520 let mut manager = CrvSessionManager::new(config);
521
522 manager
523 .create_session("sess-1".to_string(), "coord-1".to_string())
524 .unwrap();
525
526 let data = StageIIData {
527 impressions: vec![
528 (SensoryModality::Texture, "rough".to_string()),
529 (SensoryModality::Color, "gray".to_string()),
530 ],
531 feature_vector: None,
532 };
533
534 let emb = manager.add_stage_ii("sess-1", &data).unwrap();
535 assert_eq!(emb.len(), 32);
536 }
537
538 #[test]
539 fn test_full_session_flow() {
540 let config = test_config();
541 let mut manager = CrvSessionManager::new(config);
542
543 manager
544 .create_session("sess-1".to_string(), "coord-1".to_string())
545 .unwrap();
546
547 let s1 = StageIData {
549 stroke: vec![(0.0, 0.0), (1.0, 1.0), (2.0, 0.0)],
550 spontaneous_descriptor: "angular".to_string(),
551 classification: GestaltType::Manmade,
552 confidence: 0.9,
553 };
554 manager.add_stage_i("sess-1", &s1).unwrap();
555
556 let s2 = StageIIData {
558 impressions: vec![
559 (SensoryModality::Texture, "rough stone".to_string()),
560 (SensoryModality::Temperature, "cold".to_string()),
561 ],
562 feature_vector: None,
563 };
564 manager.add_stage_ii("sess-1", &s2).unwrap();
565
566 let s4 = StageIVData {
568 emotional_impact: vec![("solemn".to_string(), 0.6)],
569 tangibles: vec!["stone blocks".to_string()],
570 intangibles: vec!["ancient".to_string()],
571 aol_detections: vec![],
572 };
573 manager.add_stage_iv("sess-1", &s4).unwrap();
574
575 assert_eq!(manager.session_entry_count("sess-1"), 3);
576
577 let entries = manager.get_session_embeddings("sess-1").unwrap();
579 assert_eq!(entries.len(), 3);
580 assert_eq!(entries[0].stage, 1);
581 assert_eq!(entries[1].stage, 2);
582 assert_eq!(entries[2].stage, 4);
583 }
584
585 #[test]
586 fn test_duplicate_session() {
587 let config = test_config();
588 let mut manager = CrvSessionManager::new(config);
589
590 manager
591 .create_session("sess-1".to_string(), "coord-1".to_string())
592 .unwrap();
593
594 let result = manager.create_session("sess-1".to_string(), "coord-2".to_string());
595 assert!(result.is_err());
596 }
597
598 #[test]
599 fn test_session_not_found() {
600 let config = test_config();
601 let mut manager = CrvSessionManager::new(config);
602
603 let s1 = StageIData {
604 stroke: vec![(0.0, 0.0), (1.0, 1.0)],
605 spontaneous_descriptor: "test".to_string(),
606 classification: GestaltType::Natural,
607 confidence: 0.5,
608 };
609
610 let result = manager.add_stage_i("nonexistent", &s1);
611 assert!(result.is_err());
612 }
613
614 #[test]
615 fn test_remove_session() {
616 let config = test_config();
617 let mut manager = CrvSessionManager::new(config);
618
619 manager
620 .create_session("sess-1".to_string(), "coord-1".to_string())
621 .unwrap();
622 assert_eq!(manager.session_count(), 1);
623
624 assert!(manager.remove_session("sess-1"));
625 assert_eq!(manager.session_count(), 0);
626
627 assert!(!manager.remove_session("sess-1"));
628 }
629}