Skip to main content

ruvector_crv/
session.rs

1//! CRV Session Manager
2//!
3//! Manages CRV sessions as directed acyclic graphs (DAGs), where each session
4//! progresses through stages I-VI. Provides cross-session convergence analysis
5//! to find agreement between multiple viewers targeting the same coordinate.
6//!
7//! # Architecture
8//!
9//! Each session is a DAG of stage entries. Cross-session convergence is computed
10//! by finding entries with high embedding similarity across different sessions
11//! targeting the same coordinate.
12
13use crate::error::{CrvError, CrvResult};
14use crate::stage_i::StageIEncoder;
15use crate::stage_ii::StageIIEncoder;
16use crate::stage_iii::StageIIIEncoder;
17use crate::stage_iv::StageIVEncoder;
18use crate::stage_v::StageVEngine;
19use crate::stage_vi::StageVIModeler;
20use crate::types::*;
21use ruvector_gnn::search::cosine_similarity;
22use std::collections::HashMap;
23
24/// A session entry stored in the session graph.
25#[derive(Debug, Clone)]
26struct SessionEntry {
27    /// The stage data embedding.
28    embedding: Vec<f32>,
29    /// Stage number (1-6).
30    stage: u8,
31    /// Entry index within the stage.
32    entry_index: usize,
33    /// Metadata.
34    metadata: HashMap<String, serde_json::Value>,
35    /// Timestamp.
36    timestamp_ms: u64,
37}
38
39/// A complete CRV session with all stage data.
40#[derive(Debug)]
41struct Session {
42    /// Session identifier.
43    id: SessionId,
44    /// Target coordinate.
45    coordinate: TargetCoordinate,
46    /// Entries organized by stage.
47    entries: Vec<SessionEntry>,
48}
49
50/// CRV Session Manager: coordinates all stage encoders and manages sessions.
51#[derive(Debug)]
52pub struct CrvSessionManager {
53    /// Configuration.
54    config: CrvConfig,
55    /// Stage I encoder.
56    stage_i: StageIEncoder,
57    /// Stage II encoder.
58    stage_ii: StageIIEncoder,
59    /// Stage III encoder.
60    stage_iii: StageIIIEncoder,
61    /// Stage IV encoder.
62    stage_iv: StageIVEncoder,
63    /// Stage V engine.
64    stage_v: StageVEngine,
65    /// Stage VI modeler.
66    stage_vi: StageVIModeler,
67    /// Active sessions indexed by session ID.
68    sessions: HashMap<SessionId, Session>,
69}
70
71impl CrvSessionManager {
72    /// Create a new session manager with the given configuration.
73    pub fn new(config: CrvConfig) -> Self {
74        let stage_i = StageIEncoder::new(&config);
75        let stage_ii = StageIIEncoder::new(&config);
76        let stage_iii = StageIIIEncoder::new(&config);
77        let stage_iv = StageIVEncoder::new(&config);
78        let stage_v = StageVEngine::new(&config);
79        let stage_vi = StageVIModeler::new(&config);
80
81        Self {
82            config,
83            stage_i,
84            stage_ii,
85            stage_iii,
86            stage_iv,
87            stage_v,
88            stage_vi,
89            sessions: HashMap::new(),
90        }
91    }
92
93    /// Create a new session for a given target coordinate.
94    pub fn create_session(
95        &mut self,
96        session_id: SessionId,
97        coordinate: TargetCoordinate,
98    ) -> CrvResult<()> {
99        if self.sessions.contains_key(&session_id) {
100            return Err(CrvError::EncodingError(format!(
101                "Session {} already exists",
102                session_id
103            )));
104        }
105
106        self.sessions.insert(
107            session_id.clone(),
108            Session {
109                id: session_id,
110                coordinate,
111                entries: Vec::new(),
112            },
113        );
114
115        Ok(())
116    }
117
118    /// Add Stage I data to a session.
119    pub fn add_stage_i(
120        &mut self,
121        session_id: &str,
122        data: &StageIData,
123    ) -> CrvResult<Vec<f32>> {
124        let embedding = self.stage_i.encode(data)?;
125        self.add_entry(session_id, 1, embedding.clone(), HashMap::new())?;
126        Ok(embedding)
127    }
128
129    /// Add Stage II data to a session.
130    pub fn add_stage_ii(
131        &mut self,
132        session_id: &str,
133        data: &StageIIData,
134    ) -> CrvResult<Vec<f32>> {
135        let embedding = self.stage_ii.encode(data)?;
136        self.add_entry(session_id, 2, embedding.clone(), HashMap::new())?;
137        Ok(embedding)
138    }
139
140    /// Add Stage III data to a session.
141    pub fn add_stage_iii(
142        &mut self,
143        session_id: &str,
144        data: &StageIIIData,
145    ) -> CrvResult<Vec<f32>> {
146        let embedding = self.stage_iii.encode(data)?;
147        self.add_entry(session_id, 3, embedding.clone(), HashMap::new())?;
148        Ok(embedding)
149    }
150
151    /// Add Stage IV data to a session.
152    pub fn add_stage_iv(
153        &mut self,
154        session_id: &str,
155        data: &StageIVData,
156    ) -> CrvResult<Vec<f32>> {
157        let embedding = self.stage_iv.encode(data)?;
158        self.add_entry(session_id, 4, embedding.clone(), HashMap::new())?;
159        Ok(embedding)
160    }
161
162    /// Run Stage V interrogation on a session.
163    ///
164    /// Probes the accumulated session data with specified queries.
165    pub fn run_stage_v(
166        &mut self,
167        session_id: &str,
168        probe_queries: &[(&str, u8, Vec<f32>)], // (query text, target stage, query embedding)
169        k: usize,
170    ) -> CrvResult<StageVData> {
171        let session = self
172            .sessions
173            .get(session_id)
174            .ok_or_else(|| CrvError::SessionNotFound(session_id.to_string()))?;
175
176        let all_embeddings: Vec<Vec<f32>> =
177            session.entries.iter().map(|e| e.embedding.clone()).collect();
178
179        let mut probes = Vec::new();
180        let mut cross_refs = Vec::new();
181
182        for (query_text, target_stage, query_emb) in probe_queries {
183            // Filter candidates to the target stage
184            let stage_entries: Vec<Vec<f32>> = session
185                .entries
186                .iter()
187                .filter(|e| e.stage == *target_stage)
188                .map(|e| e.embedding.clone())
189                .collect();
190
191            if stage_entries.is_empty() {
192                continue;
193            }
194
195            let mut probe = self.stage_v.probe(query_emb, &stage_entries, k)?;
196            probe.query = query_text.to_string();
197            probe.target_stage = *target_stage;
198            probes.push(probe);
199        }
200
201        // Cross-reference between all stage pairs
202        for from_stage in 1..=4u8 {
203            for to_stage in (from_stage + 1)..=4u8 {
204                let from_entries: Vec<Vec<f32>> = session
205                    .entries
206                    .iter()
207                    .filter(|e| e.stage == from_stage)
208                    .map(|e| e.embedding.clone())
209                    .collect();
210                let to_entries: Vec<Vec<f32>> = session
211                    .entries
212                    .iter()
213                    .filter(|e| e.stage == to_stage)
214                    .map(|e| e.embedding.clone())
215                    .collect();
216
217                if !from_entries.is_empty() && !to_entries.is_empty() {
218                    let refs = self.stage_v.cross_reference(
219                        from_stage,
220                        &from_entries,
221                        to_stage,
222                        &to_entries,
223                        self.config.convergence_threshold,
224                    );
225                    cross_refs.extend(refs);
226                }
227            }
228        }
229
230        let stage_v_data = StageVData {
231            probes,
232            cross_references: cross_refs,
233        };
234
235        // Encode Stage V result and add to session
236        if !stage_v_data.probes.is_empty() {
237            let embedding = self.stage_v.encode(&stage_v_data, &all_embeddings)?;
238            self.add_entry(session_id, 5, embedding, HashMap::new())?;
239        }
240
241        Ok(stage_v_data)
242    }
243
244    /// Run Stage VI composite modeling on a session.
245    pub fn run_stage_vi(&mut self, session_id: &str) -> CrvResult<StageVIData> {
246        let session = self
247            .sessions
248            .get(session_id)
249            .ok_or_else(|| CrvError::SessionNotFound(session_id.to_string()))?;
250
251        let embeddings: Vec<Vec<f32>> =
252            session.entries.iter().map(|e| e.embedding.clone()).collect();
253        let labels: Vec<(u8, usize)> = session
254            .entries
255            .iter()
256            .map(|e| (e.stage, e.entry_index))
257            .collect();
258
259        let stage_vi_data = self.stage_vi.partition(&embeddings, &labels)?;
260
261        // Encode Stage VI result and add to session
262        let embedding = self.stage_vi.encode(&stage_vi_data)?;
263        self.add_entry(session_id, 6, embedding, HashMap::new())?;
264
265        Ok(stage_vi_data)
266    }
267
268    /// Find convergence across multiple sessions targeting the same coordinate.
269    ///
270    /// This is the core multi-viewer matching operation: given sessions from
271    /// different viewers targeting the same coordinate, find which aspects
272    /// of their signal line data converge (agree).
273    pub fn find_convergence(
274        &self,
275        coordinate: &str,
276        min_similarity: f32,
277    ) -> CrvResult<ConvergenceResult> {
278        // Collect all sessions for this coordinate
279        let relevant_sessions: Vec<&Session> = self
280            .sessions
281            .values()
282            .filter(|s| s.coordinate == coordinate)
283            .collect();
284
285        if relevant_sessions.len() < 2 {
286            return Err(CrvError::EmptyInput(
287                "Need at least 2 sessions for convergence analysis".to_string(),
288            ));
289        }
290
291        let mut session_pairs = Vec::new();
292        let mut scores = Vec::new();
293        let mut convergent_stages = Vec::new();
294
295        // Compare all pairs of sessions
296        for i in 0..relevant_sessions.len() {
297            for j in (i + 1)..relevant_sessions.len() {
298                let sess_a = relevant_sessions[i];
299                let sess_b = relevant_sessions[j];
300
301                // Compare stage-by-stage
302                for stage in 1..=6u8 {
303                    let entries_a: Vec<&[f32]> = sess_a
304                        .entries
305                        .iter()
306                        .filter(|e| e.stage == stage)
307                        .map(|e| e.embedding.as_slice())
308                        .collect();
309                    let entries_b: Vec<&[f32]> = sess_b
310                        .entries
311                        .iter()
312                        .filter(|e| e.stage == stage)
313                        .map(|e| e.embedding.as_slice())
314                        .collect();
315
316                    if entries_a.is_empty() || entries_b.is_empty() {
317                        continue;
318                    }
319
320                    // Find best match for each entry in A against entries in B
321                    for emb_a in &entries_a {
322                        for emb_b in &entries_b {
323                            if emb_a.len() == emb_b.len() && !emb_a.is_empty() {
324                                let sim = cosine_similarity(emb_a, emb_b);
325                                if sim >= min_similarity {
326                                    session_pairs
327                                        .push((sess_a.id.clone(), sess_b.id.clone()));
328                                    scores.push(sim);
329                                    if !convergent_stages.contains(&stage) {
330                                        convergent_stages.push(stage);
331                                    }
332                                }
333                            }
334                        }
335                    }
336                }
337            }
338        }
339
340        // Compute consensus embedding (mean of all converging embeddings)
341        let consensus_embedding = if !scores.is_empty() {
342            let mut consensus = vec![0.0f32; self.config.dimensions];
343            let mut count = 0usize;
344
345            for session in &relevant_sessions {
346                for entry in &session.entries {
347                    if convergent_stages.contains(&entry.stage) {
348                        for (i, &v) in entry.embedding.iter().enumerate() {
349                            if i < self.config.dimensions {
350                                consensus[i] += v;
351                            }
352                        }
353                        count += 1;
354                    }
355                }
356            }
357
358            if count > 0 {
359                for v in &mut consensus {
360                    *v /= count as f32;
361                }
362                Some(consensus)
363            } else {
364                None
365            }
366        } else {
367            None
368        };
369
370        // Sort convergent stages
371        convergent_stages.sort();
372
373        Ok(ConvergenceResult {
374            session_pairs,
375            scores,
376            convergent_stages,
377            consensus_embedding,
378        })
379    }
380
381    /// Get all embeddings for a session.
382    pub fn get_session_embeddings(&self, session_id: &str) -> CrvResult<Vec<CrvSessionEntry>> {
383        let session = self
384            .sessions
385            .get(session_id)
386            .ok_or_else(|| CrvError::SessionNotFound(session_id.to_string()))?;
387
388        Ok(session
389            .entries
390            .iter()
391            .map(|e| CrvSessionEntry {
392                session_id: session.id.clone(),
393                coordinate: session.coordinate.clone(),
394                stage: e.stage,
395                embedding: e.embedding.clone(),
396                metadata: e.metadata.clone(),
397                timestamp_ms: e.timestamp_ms,
398            })
399            .collect())
400    }
401
402    /// Get the number of entries in a session.
403    pub fn session_entry_count(&self, session_id: &str) -> usize {
404        self.sessions
405            .get(session_id)
406            .map(|s| s.entries.len())
407            .unwrap_or(0)
408    }
409
410    /// Get the number of active sessions.
411    pub fn session_count(&self) -> usize {
412        self.sessions.len()
413    }
414
415    /// Remove a session.
416    pub fn remove_session(&mut self, session_id: &str) -> bool {
417        self.sessions.remove(session_id).is_some()
418    }
419
420    /// Get access to the Stage I encoder for direct operations.
421    pub fn stage_i_encoder(&self) -> &StageIEncoder {
422        &self.stage_i
423    }
424
425    /// Get access to the Stage II encoder for direct operations.
426    pub fn stage_ii_encoder(&self) -> &StageIIEncoder {
427        &self.stage_ii
428    }
429
430    /// Get access to the Stage IV encoder for direct operations.
431    pub fn stage_iv_encoder(&self) -> &StageIVEncoder {
432        &self.stage_iv
433    }
434
435    /// Get access to the Stage V engine for direct operations.
436    pub fn stage_v_engine(&self) -> &StageVEngine {
437        &self.stage_v
438    }
439
440    /// Get access to the Stage VI modeler for direct operations.
441    pub fn stage_vi_modeler(&self) -> &StageVIModeler {
442        &self.stage_vi
443    }
444
445    /// Internal: add an entry to a session.
446    fn add_entry(
447        &mut self,
448        session_id: &str,
449        stage: u8,
450        embedding: Vec<f32>,
451        metadata: HashMap<String, serde_json::Value>,
452    ) -> CrvResult<()> {
453        let session = self
454            .sessions
455            .get_mut(session_id)
456            .ok_or_else(|| CrvError::SessionNotFound(session_id.to_string()))?;
457
458        let entry_index = session.entries.iter().filter(|e| e.stage == stage).count();
459
460        session.entries.push(SessionEntry {
461            embedding,
462            stage,
463            entry_index,
464            metadata,
465            timestamp_ms: 0,
466        });
467
468        Ok(())
469    }
470}
471
472#[cfg(test)]
473mod tests {
474    use super::*;
475
476    fn test_config() -> CrvConfig {
477        CrvConfig {
478            dimensions: 32,
479            convergence_threshold: 0.5,
480            ..CrvConfig::default()
481        }
482    }
483
484    #[test]
485    fn test_session_creation() {
486        let config = test_config();
487        let mut manager = CrvSessionManager::new(config);
488
489        manager
490            .create_session("sess-1".to_string(), "1234-5678".to_string())
491            .unwrap();
492        assert_eq!(manager.session_count(), 1);
493        assert_eq!(manager.session_entry_count("sess-1"), 0);
494    }
495
496    #[test]
497    fn test_add_stage_i() {
498        let config = test_config();
499        let mut manager = CrvSessionManager::new(config);
500
501        manager
502            .create_session("sess-1".to_string(), "1234-5678".to_string())
503            .unwrap();
504
505        let data = StageIData {
506            stroke: vec![(0.0, 0.0), (1.0, 1.0), (2.0, 0.0)],
507            spontaneous_descriptor: "angular".to_string(),
508            classification: GestaltType::Manmade,
509            confidence: 0.9,
510        };
511
512        let emb = manager.add_stage_i("sess-1", &data).unwrap();
513        assert_eq!(emb.len(), 32);
514        assert_eq!(manager.session_entry_count("sess-1"), 1);
515    }
516
517    #[test]
518    fn test_add_stage_ii() {
519        let config = test_config();
520        let mut manager = CrvSessionManager::new(config);
521
522        manager
523            .create_session("sess-1".to_string(), "coord-1".to_string())
524            .unwrap();
525
526        let data = StageIIData {
527            impressions: vec![
528                (SensoryModality::Texture, "rough".to_string()),
529                (SensoryModality::Color, "gray".to_string()),
530            ],
531            feature_vector: None,
532        };
533
534        let emb = manager.add_stage_ii("sess-1", &data).unwrap();
535        assert_eq!(emb.len(), 32);
536    }
537
538    #[test]
539    fn test_full_session_flow() {
540        let config = test_config();
541        let mut manager = CrvSessionManager::new(config);
542
543        manager
544            .create_session("sess-1".to_string(), "coord-1".to_string())
545            .unwrap();
546
547        // Stage I
548        let s1 = StageIData {
549            stroke: vec![(0.0, 0.0), (1.0, 1.0), (2.0, 0.0)],
550            spontaneous_descriptor: "angular".to_string(),
551            classification: GestaltType::Manmade,
552            confidence: 0.9,
553        };
554        manager.add_stage_i("sess-1", &s1).unwrap();
555
556        // Stage II
557        let s2 = StageIIData {
558            impressions: vec![
559                (SensoryModality::Texture, "rough stone".to_string()),
560                (SensoryModality::Temperature, "cold".to_string()),
561            ],
562            feature_vector: None,
563        };
564        manager.add_stage_ii("sess-1", &s2).unwrap();
565
566        // Stage IV
567        let s4 = StageIVData {
568            emotional_impact: vec![("solemn".to_string(), 0.6)],
569            tangibles: vec!["stone blocks".to_string()],
570            intangibles: vec!["ancient".to_string()],
571            aol_detections: vec![],
572        };
573        manager.add_stage_iv("sess-1", &s4).unwrap();
574
575        assert_eq!(manager.session_entry_count("sess-1"), 3);
576
577        // Get all entries
578        let entries = manager.get_session_embeddings("sess-1").unwrap();
579        assert_eq!(entries.len(), 3);
580        assert_eq!(entries[0].stage, 1);
581        assert_eq!(entries[1].stage, 2);
582        assert_eq!(entries[2].stage, 4);
583    }
584
585    #[test]
586    fn test_duplicate_session() {
587        let config = test_config();
588        let mut manager = CrvSessionManager::new(config);
589
590        manager
591            .create_session("sess-1".to_string(), "coord-1".to_string())
592            .unwrap();
593
594        let result = manager.create_session("sess-1".to_string(), "coord-2".to_string());
595        assert!(result.is_err());
596    }
597
598    #[test]
599    fn test_session_not_found() {
600        let config = test_config();
601        let mut manager = CrvSessionManager::new(config);
602
603        let s1 = StageIData {
604            stroke: vec![(0.0, 0.0), (1.0, 1.0)],
605            spontaneous_descriptor: "test".to_string(),
606            classification: GestaltType::Natural,
607            confidence: 0.5,
608        };
609
610        let result = manager.add_stage_i("nonexistent", &s1);
611        assert!(result.is_err());
612    }
613
614    #[test]
615    fn test_remove_session() {
616        let config = test_config();
617        let mut manager = CrvSessionManager::new(config);
618
619        manager
620            .create_session("sess-1".to_string(), "coord-1".to_string())
621            .unwrap();
622        assert_eq!(manager.session_count(), 1);
623
624        assert!(manager.remove_session("sess-1"));
625        assert_eq!(manager.session_count(), 0);
626
627        assert!(!manager.remove_session("sess-1"));
628    }
629}