Skip to main content

elara_runtime/
node.rs

1//! ELARA Node - Runtime loop implementation
2
3use std::collections::{HashMap, VecDeque};
4use std::sync::Arc;
5use std::time::{Duration, Instant};
6
7use elara_core::{
8    Event, EventType, MessageId, MutationOp, NodeId, PacketClass, RepresentationProfile, SessionId,
9    StateId, StateTime, TimeIntent, VersionVector,
10};
11use elara_crypto::{Identity, SecureFrameProcessor};
12use elara_state::ReconciliationEngine;
13use elara_time::TimeEngine;
14use elara_visual::{
15    livestream_state_id, stream_visual_state_id, visual_state_id, PredictionConfig, VisualEncoder,
16    VisualPredictor, VisualState, VisualStateBuffer,
17};
18use elara_wire::{Extensions, FixedHeader, Frame, FrameBuilder, AUTH_TAG_SIZE};
19
20use crate::observability::metrics::NodeMetrics;
21use crate::observability::ObservabilityConfig;
22
23/// ELARA Node configuration
24///
25/// # Observability
26///
27/// The `observability` field provides unified configuration for all observability
28/// components (logging, tracing, metrics). It is optional and disabled by default.
29///
30/// When enabled, observability components are initialized before the node starts:
31/// - **Logging**: Structured logs with configurable format and output
32/// - **Tracing**: Distributed tracing with OpenTelemetry support
33/// - **Metrics Server**: HTTP server exposing Prometheus metrics
34///
35/// # Example with Observability
36///
37/// ```no_run
38/// use elara_runtime::node::NodeConfig;
39/// use elara_runtime::observability::{
40///     ObservabilityConfig, LoggingConfig, LogLevel, LogFormat, LogOutput,
41///     MetricsServerConfig
42/// };
43/// use std::time::Duration;
44///
45/// let config = NodeConfig {
46///     tick_interval: Duration::from_millis(100),
47///     max_packet_buffer: 1000,
48///     max_outgoing_buffer: 1000,
49///     max_local_events: 1000,
50///     metrics: None,
51///     observability: Some(ObservabilityConfig {
52///         logging: Some(LoggingConfig {
53///             level: LogLevel::Info,
54///             format: LogFormat::Json,
55///             output: LogOutput::Stdout,
56///         }),
57///         tracing: None,
58///         metrics_server: Some(MetricsServerConfig {
59///             bind_address: "0.0.0.0".to_string(),
60///             port: 9090,
61///         }),
62///     }),
63/// };
64/// ```
65///
66/// # Example without Observability (Default)
67///
68/// ```no_run
69/// use elara_runtime::node::NodeConfig;
70/// use std::time::Duration;
71///
72/// let config = NodeConfig {
73///     tick_interval: Duration::from_millis(100),
74///     max_packet_buffer: 1000,
75///     max_outgoing_buffer: 1000,
76///     max_local_events: 1000,
77///     metrics: None,
78///     observability: None, // Observability disabled
79/// };
80/// ```
81#[derive(Clone, Debug)]
82pub struct NodeConfig {
83    /// Tick interval
84    pub tick_interval: Duration,
85    /// Maximum incoming packet buffer
86    pub max_packet_buffer: usize,
87    /// Maximum outgoing packet buffer
88    pub max_outgoing_buffer: usize,
89    pub max_local_events: usize,
90    /// Optional metrics for monitoring (None = metrics disabled)
91    pub metrics: Option<NodeMetrics>,
92    /// Optional unified observability configuration (None = observability disabled)
93    ///
94    /// When set, this enables structured logging, distributed tracing, and/or
95    /// metrics server based on the provided configuration. All components are
96    /// opt-in - set individual fields to `None` to disable specific components.
97    ///
98    /// **Note**: This is separate from the `metrics` field. The `metrics` field
99    /// provides direct access to metrics for the node runtime, while `observability`
100    /// provides a unified initialization system with HTTP server support.
101    pub observability: Option<ObservabilityConfig>,
102    /// Optional health check configuration (None = health checks disabled)
103    ///
104    /// When set, this enables the health check system with built-in checks for:
105    /// - Connection health (minimum active connections)
106    /// - Memory usage (maximum memory threshold)
107    /// - Time drift (maximum drift from network consensus)
108    /// - State divergence (maximum pending events)
109    ///
110    /// Health checks are opt-in and disabled by default. When enabled, you can
111    /// configure thresholds for each check and optionally expose HTTP endpoints
112    /// for Kubernetes probes and load balancers.
113    ///
114    /// # Example
115    ///
116    /// ```rust,no_run
117    /// use elara_runtime::node::NodeConfig;
118    /// use elara_runtime::health::HealthCheckConfig;
119    /// use std::time::Duration;
120    ///
121    /// let config = NodeConfig {
122    ///     health_checks: Some(HealthCheckConfig::medium_deployment()),
123    ///     ..Default::default()
124    /// };
125    /// ```
126    ///
127    /// # Production Deployment
128    ///
129    /// Use the preset configurations for common deployment sizes:
130    /// - `HealthCheckConfig::small_deployment()` - 10 nodes
131    /// - `HealthCheckConfig::medium_deployment()` - 100 nodes
132    /// - `HealthCheckConfig::large_deployment()` - 1000 nodes
133    ///
134    /// Or customize thresholds based on your specific requirements:
135    ///
136    /// ```rust,no_run
137    /// use elara_runtime::health::HealthCheckConfig;
138    /// use std::time::Duration;
139    ///
140    /// let health_config = HealthCheckConfig {
141    ///     enabled: true,
142    ///     server_bind_address: Some("0.0.0.0:8080".parse().unwrap()),
143    ///     cache_ttl: Duration::from_secs(30),
144    ///     min_connections: Some(5),
145    ///     max_memory_mb: Some(2000),
146    ///     max_time_drift_ms: Some(100),
147    ///     max_pending_events: Some(1000),
148    /// };
149    /// ```
150    pub health_checks: Option<crate::health::HealthCheckConfig>,
151}
152
153#[derive(Clone, Debug, Default)]
154pub struct RuntimeStats {
155    pub ticks: u64,
156    pub incoming_queued: u64,
157    pub outgoing_popped: u64,
158    pub local_events_queued: u64,
159    pub events_signed: u64,
160    pub packets_in: u64,
161    pub packets_out: u64,
162    pub last_tick_duration: Duration,
163}
164
165#[derive(Clone, Debug)]
166pub struct FeedItem {
167    pub id: MessageId,
168    pub author: NodeId,
169    pub content: Vec<u8>,
170    pub timestamp: StateTime,
171    pub deleted: bool,
172}
173
174#[derive(Clone, Debug, Default)]
175pub struct FeedStream {
176    pub items: Vec<FeedItem>,
177}
178
179#[derive(Clone, Debug)]
180pub struct StreamMetadata {
181    pub source: NodeId,
182    pub started_at: StateTime,
183    pub data: Vec<u8>,
184}
185
186impl FeedItem {
187    fn decode(buf: &[u8]) -> Option<(Self, usize)> {
188        if buf.len() < 27 {
189            return None;
190        }
191
192        let id = MessageId(u64::from_le_bytes(buf[0..8].try_into().ok()?));
193        let author = NodeId::from_bytes(buf[8..16].try_into().ok()?);
194        let timestamp = StateTime::from_micros(i64::from_le_bytes(buf[16..24].try_into().ok()?));
195        let deleted = buf[24] != 0;
196        let mut offset = 25;
197
198        if buf.len() < offset + 2 {
199            return None;
200        }
201        let content_len = u16::from_le_bytes(buf[offset..offset + 2].try_into().ok()?) as usize;
202        offset += 2;
203
204        if buf.len() < offset + content_len {
205            return None;
206        }
207        let content = buf[offset..offset + content_len].to_vec();
208        offset += content_len;
209
210        Some((
211            FeedItem {
212                id,
213                author,
214                content,
215                timestamp,
216                deleted,
217            },
218            offset,
219        ))
220    }
221}
222
223impl FeedStream {
224    fn from_bytes(data: &[u8]) -> Self {
225        let mut stream = FeedStream::default();
226        let mut offset = 0;
227        while offset < data.len() {
228            let Some((item, used)) = FeedItem::decode(&data[offset..]) else {
229                break;
230            };
231            stream.apply_item(item);
232            offset += used;
233        }
234        stream
235    }
236
237    fn apply_item(&mut self, item: FeedItem) {
238        if let Some(existing) = self.items.iter_mut().find(|m| m.id == item.id) {
239            existing.deleted = item.deleted;
240            existing.content = item.content;
241            existing.timestamp = item.timestamp;
242            existing.author = item.author;
243            return;
244        }
245
246        let pos = self
247            .items
248            .binary_search_by(|m| m.timestamp.cmp(&item.timestamp))
249            .unwrap_or_else(|p| p);
250        self.items.insert(pos, item);
251    }
252}
253
254impl Default for NodeConfig {
255    fn default() -> Self {
256        NodeConfig {
257            tick_interval: Duration::from_millis(10),
258            max_packet_buffer: 1000,
259            max_outgoing_buffer: 1000,
260            max_local_events: 1000,
261            metrics: None,
262            observability: None, // Observability disabled by default
263            health_checks: None, // Health checks disabled by default
264        }
265    }
266}
267
268impl NodeConfig {
269    /// Initializes health checks based on the configuration.
270    ///
271    /// This method creates a `HealthChecker` with the configured checks and
272    /// optionally starts an HTTP server to expose health endpoints.
273    ///
274    /// # Arguments
275    ///
276    /// * `node` - Arc reference to the Node for health checks that need node access
277    ///
278    /// # Returns
279    ///
280    /// Returns `Some((checker, server_handle))` if health checks are enabled, where:
281    /// - `checker` is the configured `HealthChecker`
282    /// - `server_handle` is `Some(JoinHandle)` if HTTP server is started, `None` otherwise
283    ///
284    /// Returns `None` if health checks are disabled.
285    ///
286    /// # Example
287    ///
288    /// ```rust,no_run
289    /// use elara_runtime::node::{Node, NodeConfig};
290    /// use elara_runtime::health::HealthCheckConfig;
291    /// use std::sync::Arc;
292    ///
293    /// let config = NodeConfig {
294    ///     health_checks: Some(HealthCheckConfig::medium_deployment()),
295    ///     ..Default::default()
296    /// };
297    ///
298    /// let node = Arc::new(Node::with_config(config.clone()));
299    ///
300    /// if let Some((checker, server_handle)) = config.init_health_checks(node) {
301    ///     println!("Health checks initialized");
302    ///     
303    ///     // Check health programmatically
304    ///     let status = checker.check_health();
305    ///     println!("Health status: {:?}", status.overall);
306    ///     
307    ///     // Server is running in background (if configured)
308    ///     if let Some(handle) = server_handle {
309    ///         // Server will run until handle is dropped or joined
310    ///     }
311    /// }
312    /// ```
313    ///
314    /// # HTTP Endpoints
315    ///
316    /// When `server_bind_address` is configured, the following endpoints are exposed:
317    ///
318    /// - `GET /health` - Overall health status
319    ///   - Returns 200 OK if healthy or degraded
320    ///   - Returns 503 Service Unavailable if unhealthy
321    ///
322    /// - `GET /ready` - Readiness probe (Kubernetes)
323    ///   - Returns 200 OK if healthy or degraded
324    ///   - Returns 503 Service Unavailable if unhealthy
325    ///
326    /// - `GET /live` - Liveness probe (Kubernetes)
327    ///   - Returns 200 OK if healthy or degraded
328    ///   - Returns 503 Service Unavailable if unhealthy
329    ///
330    /// # Panics
331    ///
332    /// Panics if the health check configuration is invalid (fails validation).
333    pub fn init_health_checks(
334        &self,
335        node: Arc<Node>,
336    ) -> Option<(
337        Arc<crate::health::HealthChecker>,
338        Option<tokio::task::JoinHandle<Result<(), std::io::Error>>>,
339    )> {
340        use crate::health::{
341            ConnectionHealthCheck, HealthChecker, MemoryHealthCheck, StateDivergenceCheck,
342            TimeDriftCheck,
343        };
344        use crate::health_server::{HealthServer, HealthServerConfig};
345
346        // Return None if health checks are disabled
347        let health_config = self.health_checks.as_ref()?;
348
349        // Validate configuration
350        health_config
351            .validate()
352            .expect("Invalid health check configuration");
353
354        // Return None if explicitly disabled
355        if !health_config.enabled {
356            return None;
357        }
358
359        // Create health checker with configured cache TTL
360        let mut checker = HealthChecker::new(health_config.cache_ttl);
361
362        // Add built-in health checks based on configuration
363        if let Some(min_connections) = health_config.min_connections {
364            checker.add_check(Box::new(ConnectionHealthCheck::new(
365                node.clone(),
366                min_connections,
367            )));
368        }
369
370        if let Some(max_memory_mb) = health_config.max_memory_mb {
371            checker.add_check(Box::new(MemoryHealthCheck::new(max_memory_mb)));
372        }
373
374        if let Some(max_drift_ms) = health_config.max_time_drift_ms {
375            checker.add_check(Box::new(TimeDriftCheck::new(node.clone(), max_drift_ms)));
376        }
377
378        if let Some(max_pending_events) = health_config.max_pending_events {
379            checker.add_check(Box::new(StateDivergenceCheck::with_threshold(
380                node.clone(),
381                max_pending_events,
382            )));
383        }
384
385        let checker = Arc::new(checker);
386
387        // Start HTTP server if bind address is configured
388        let server_handle = if let Some(bind_address) = health_config.server_bind_address {
389            let server_config = HealthServerConfig { bind_address };
390            let server = HealthServer::new(checker.clone(), server_config);
391
392            // Spawn server in background task
393            let handle = tokio::spawn(async move {
394                server.serve().await
395            });
396
397            Some(handle)
398        } else {
399            None
400        };
401
402        Some((checker, server_handle))
403    }
404}
405
406/// ELARA Node - the runtime entity
407pub struct Node {
408    /// Node identity
409    identity: Identity,
410    /// Current session (if any)
411    session_id: Option<SessionId>,
412    /// Time engine
413    time_engine: TimeEngine,
414    /// State reconciliation engine
415    state_engine: ReconciliationEngine,
416    secure_processor: Option<SecureFrameProcessor>,
417    /// Incoming packet buffer
418    incoming: VecDeque<Frame>,
419    /// Outgoing packet buffer
420    outgoing: VecDeque<Frame>,
421    /// Local events to send
422    local_events: Vec<Event>,
423    /// Event sequence counter
424    event_seq: u64,
425    /// Configuration
426    config: NodeConfig,
427    stats: RuntimeStats,
428    stream_metadata: HashMap<u64, StreamMetadata>,
429    visual_buffers: HashMap<NodeId, VisualStateBuffer>,
430    visual_predictors: HashMap<NodeId, VisualPredictor>,
431    stream_visual_buffers: HashMap<u64, VisualStateBuffer>,
432    stream_visual_predictors: HashMap<u64, VisualPredictor>,
433    /// Optional metrics (cloned from config for convenience)
434    metrics: Option<NodeMetrics>,
435}
436
437impl Node {
438    /// Create a new node with generated identity
439    pub fn new() -> Self {
440        let node = Self::with_config(NodeConfig::default());
441        tracing::info!(
442            node_id = node.node_id().0,
443            "Created new node"
444        );
445        node
446    }
447
448    /// Create a new node with custom configuration
449    pub fn with_config(config: NodeConfig) -> Self {
450        let metrics = config.metrics.clone();
451        Node {
452            identity: Identity::generate(),
453            session_id: None,
454            time_engine: TimeEngine::new(),
455            state_engine: ReconciliationEngine::new(),
456            secure_processor: None,
457            incoming: VecDeque::new(),
458            outgoing: VecDeque::new(),
459            local_events: Vec::new(),
460            event_seq: 0,
461            config,
462            stats: RuntimeStats::default(),
463            stream_metadata: HashMap::new(),
464            visual_buffers: HashMap::new(),
465            visual_predictors: HashMap::new(),
466            stream_visual_buffers: HashMap::new(),
467            stream_visual_predictors: HashMap::new(),
468            metrics,
469        }
470    }
471
472    pub fn with_identity(identity: Identity, config: NodeConfig) -> Self {
473        let metrics = config.metrics.clone();
474        Node {
475            identity,
476            session_id: None,
477            time_engine: TimeEngine::new(),
478            state_engine: ReconciliationEngine::new(),
479            secure_processor: None,
480            incoming: VecDeque::new(),
481            outgoing: VecDeque::new(),
482            local_events: Vec::new(),
483            event_seq: 0,
484            config,
485            stats: RuntimeStats::default(),
486            stream_metadata: HashMap::new(),
487            visual_buffers: HashMap::new(),
488            visual_predictors: HashMap::new(),
489            stream_visual_buffers: HashMap::new(),
490            stream_visual_predictors: HashMap::new(),
491            metrics,
492        }
493    }
494
495    /// Get node ID
496    pub fn node_id(&self) -> NodeId {
497        self.identity.node_id()
498    }
499
500    /// Get session ID (if in session)
501    pub fn session_id(&self) -> Option<SessionId> {
502        self.session_id
503    }
504
505    /// Join a session
506    pub fn join_session(&mut self, session_id: SessionId, session_key: [u8; 32]) {
507        let span = tracing::span!(
508            tracing::Level::INFO,
509            "join_session",
510            node_id = self.node_id().0,
511            session_id = session_id.0
512        );
513        let _enter = span.enter();
514
515        tracing::info!(
516            node_id = self.node_id().0,
517            session_id = session_id.0,
518            "Joining session"
519        );
520
521        self.session_id = Some(session_id);
522        self.secure_processor = Some(SecureFrameProcessor::new(
523            session_id,
524            self.node_id(),
525            session_key,
526        ));
527
528        // Update metrics: increment active connections and total connections
529        if let Some(ref metrics) = self.metrics {
530            metrics.active_connections.inc();
531            metrics.total_connections.inc();
532        }
533    }
534
535    pub fn join_session_unsecured(&mut self, session_id: SessionId) {
536        let span = tracing::span!(
537            tracing::Level::INFO,
538            "join_session_unsecured",
539            node_id = self.node_id().0,
540            session_id = session_id.0
541        );
542        let _enter = span.enter();
543
544        tracing::warn!(
545            node_id = self.node_id().0,
546            session_id = session_id.0,
547            "Joining session without encryption (unsecured mode)"
548        );
549
550        self.session_id = Some(session_id);
551        self.secure_processor = None;
552
553        // Update metrics: increment active connections and total connections
554        if let Some(ref metrics) = self.metrics {
555            metrics.active_connections.inc();
556            metrics.total_connections.inc();
557        }
558    }
559
560    /// Leave current session
561    pub fn leave_session(&mut self) {
562        let span = tracing::span!(
563            tracing::Level::INFO,
564            "leave_session",
565            node_id = self.node_id().0,
566            session_id = ?self.session_id
567        );
568        let _enter = span.enter();
569
570        if let Some(session_id) = self.session_id {
571            tracing::info!(
572                node_id = self.node_id().0,
573                session_id = session_id.0,
574                "Leaving session"
575            );
576
577            // Update metrics: decrement active connections
578            if let Some(ref metrics) = self.metrics {
579                metrics.active_connections.dec();
580            }
581        }
582
583        self.session_id = None;
584        self.secure_processor = None;
585    }
586
587    /// Queue an incoming frame for processing
588    pub fn queue_incoming(&mut self, frame: Frame) {
589        if self.incoming.len() < self.config.max_packet_buffer {
590            self.incoming.push_back(frame);
591            self.stats.incoming_queued += 1;
592        } else {
593            tracing::warn!(
594                node_id = self.node_id().0,
595                buffer_size = self.incoming.len(),
596                max_buffer = self.config.max_packet_buffer,
597                "Incoming packet buffer full, dropping frame"
598            );
599
600            // Update metrics: increment messages_dropped
601            if let Some(ref metrics) = self.metrics {
602                metrics.messages_dropped.inc();
603            }
604        }
605    }
606
607    /// Get next outgoing frame (if any)
608    pub fn pop_outgoing(&mut self) -> Option<Frame> {
609        let frame = self.outgoing.pop_front();
610        if frame.is_some() {
611            self.stats.outgoing_popped += 1;
612            self.stats.packets_out += 1;
613        }
614        frame
615    }
616
617    /// Queue a local event to send
618    pub fn queue_local_event(&mut self, event: Event) {
619        if self.local_events.len() < self.config.max_local_events {
620            self.local_events.push(event);
621            self.stats.local_events_queued += 1;
622        }
623    }
624
625    pub fn queue_visual_keyframe(&mut self, state: &VisualState) {
626        self.queue_visual_event(
627            visual_state_id(state.source),
628            state,
629            EventType::VisualKeyframe,
630        );
631    }
632
633    pub fn queue_visual_delta(&mut self, state: &VisualState) {
634        self.queue_visual_event(visual_state_id(state.source), state, EventType::VisualDelta);
635    }
636
637    pub fn queue_stream_visual_keyframe(&mut self, stream_id: u64, state: &VisualState) {
638        self.queue_visual_event(
639            stream_visual_state_id(stream_id),
640            state,
641            EventType::VisualKeyframe,
642        );
643    }
644
645    pub fn queue_stream_visual_delta(&mut self, stream_id: u64, state: &VisualState) {
646        self.queue_visual_event(
647            stream_visual_state_id(stream_id),
648            state,
649            EventType::VisualDelta,
650        );
651    }
652
653    pub fn queue_stream_start(&mut self, stream_id: u64, metadata: Vec<u8>, timestamp: StateTime) {
654        self.stream_metadata.insert(
655            stream_id,
656            StreamMetadata {
657                source: self.node_id(),
658                started_at: timestamp,
659                data: metadata.clone(),
660            },
661        );
662        let target_state = livestream_state_id(stream_id);
663        let seq = self.next_event_seq();
664        let time_intent = self.time_intent_for(timestamp);
665        let event = Event::new(
666            self.node_id(),
667            seq,
668            EventType::StreamStart,
669            target_state,
670            MutationOp::Set(metadata),
671        )
672        .with_time_intent(time_intent);
673        self.queue_local_event(event);
674    }
675
676    pub fn queue_stream_end(&mut self, stream_id: u64, timestamp: StateTime) {
677        self.stream_metadata.remove(&stream_id);
678        self.stream_visual_buffers.remove(&stream_id);
679        self.stream_visual_predictors.remove(&stream_id);
680        let target_state = livestream_state_id(stream_id);
681        let seq = self.next_event_seq();
682        let time_intent = self.time_intent_for(timestamp);
683        let event = Event::new(
684            self.node_id(),
685            seq,
686            EventType::StreamEnd,
687            target_state,
688            MutationOp::Delete,
689        )
690        .with_time_intent(time_intent);
691        self.queue_local_event(event);
692
693        let visual_state = stream_visual_state_id(stream_id);
694        let seq = self.next_event_seq();
695        let event = Event::new(
696            self.node_id(),
697            seq,
698            EventType::StreamEnd,
699            visual_state,
700            MutationOp::Delete,
701        )
702        .with_time_intent(time_intent);
703        self.queue_local_event(event);
704    }
705
706    pub fn queue_feed_append(&mut self, feed_state: StateId, data: Vec<u8>, timestamp: StateTime) {
707        let seq = self.next_event_seq();
708        let time_intent = self.time_intent_for(timestamp);
709        let event = Event::new(
710            self.node_id(),
711            seq,
712            EventType::FeedAppend,
713            feed_state,
714            MutationOp::Append(data),
715        )
716        .with_time_intent(time_intent);
717        self.queue_local_event(event);
718    }
719
720    pub fn queue_feed_delete(&mut self, feed_state: StateId, timestamp: StateTime) {
721        let seq = self.next_event_seq();
722        let time_intent = self.time_intent_for(timestamp);
723        let event = Event::new(
724            self.node_id(),
725            seq,
726            EventType::FeedDelete,
727            feed_state,
728            MutationOp::Delete,
729        )
730        .with_time_intent(time_intent);
731        self.queue_local_event(event);
732    }
733
734    /// Execute one tick of the runtime loop
735    /// This is the core 12-stage loop
736    pub fn tick(&mut self) {
737        let span = tracing::span!(
738            tracing::Level::INFO,
739            "node_tick",
740            node_id = self.node_id().0,
741            session_id = ?self.session_id
742        );
743        let _enter = span.enter();
744
745        let start = Instant::now();
746        self.stats.ticks += 1;
747
748        // Stage 1: Advance clocks (τp, τs) - NEVER SKIP
749        self.time_engine.tick();
750
751        // Stage 2: Ingest packets
752        let packets = self.ingest_packets();
753
754        // Stage 3: Decrypt and validate
755        let validated = self.decrypt_and_validate(packets);
756
757        // Stage 4: Classify events
758        let classify_start = Instant::now();
759        let events = self.classify_events(validated);
760        
761        // Track message processing latency
762        if let Some(ref metrics) = self.metrics {
763            let latency_ms = classify_start.elapsed().as_secs_f64() * 1000.0;
764            if !events.is_empty() {
765                metrics.message_latency_ms.observe(latency_ms);
766            }
767        }
768
769        // Stage 5: Update time model
770        self.update_time_model(&events);
771
772        // Stage 6: Reconcile state
773        let reconcile_start = Instant::now();
774        let reconcile_result = {
775            let span = tracing::span!(
776                tracing::Level::DEBUG,
777                "state_reconciliation",
778                node_id = self.node_id().0
779            );
780            let _enter = span.enter();
781            
782            let result = self.state_engine.process_events(events, &self.time_engine);
783            self.state_engine.control_divergence();
784            
785            tracing::debug!(
786                applied = result.applied,
787                rejected = result.rejected,
788                "State reconciliation complete"
789            );
790            result
791        };
792
793        // Track state sync latency
794        if let Some(ref metrics) = self.metrics {
795            let sync_latency_ms = reconcile_start.elapsed().as_secs_f64() * 1000.0;
796            metrics.state_sync_latency_ms.observe(sync_latency_ms);
797        }
798
799        // Update state reconciliation metrics
800        if let Some(ref metrics) = self.metrics {
801            // Track quarantine buffer size
802            let quarantine_size = self.state_engine.field().quarantine_size();
803            metrics.quarantine_buffer_size.set(quarantine_size as i64);
804            
805            // Track rejected events as dropped messages
806            if reconcile_result.rejected > 0 {
807                metrics.messages_dropped.inc_by(reconcile_result.rejected as u64);
808            }
809        }
810
811        // Update time drift metric (track maximum offset across all peers)
812        if let Some(ref metrics) = self.metrics {
813            let max_offset_ms = self.time_engine
814                .network()
815                .peers
816                .values()
817                .map(|peer| peer.offset.abs() * 1000.0) // Convert to milliseconds
818                .fold(0.0f64, f64::max);
819            
820            if max_offset_ms > 0.0 {
821                metrics.time_drift_ms.set(max_offset_ms as i64);
822            }
823        }
824
825        // Stage 7: Generate predictions
826        self.generate_predictions();
827
828        // Stage 8: Project to representation (handled externally)
829        // Stage 9: Collect local events (already queued)
830
831        // Stage 10: Authorize and sign
832        let authorized = self.authorize_and_sign();
833
834        // Stage 11: Build packets
835        self.build_packets(authorized);
836
837        // Stage 12: Schedule transmission (handled externally via pop_outgoing)
838
839        self.stats.last_tick_duration = start.elapsed();
840    }
841
842    /// Stage 2: Ingest packets from buffer
843    fn ingest_packets(&mut self) -> Vec<Frame> {
844        let span = tracing::span!(
845            tracing::Level::DEBUG,
846            "ingest_packets",
847            node_id = self.node_id().0
848        );
849        let _enter = span.enter();
850
851        let packets: Vec<Frame> = self.incoming.drain(..).collect();
852        self.stats.packets_in += packets.len() as u64;
853
854        // Update metrics: increment messages_received for each packet
855        if let Some(ref metrics) = self.metrics {
856            metrics.messages_received.inc_by(packets.len() as u64);
857        }
858
859        tracing::debug!(packet_count = packets.len(), "Ingested packets");
860        packets
861    }
862
863    /// Stage 3: Decrypt and validate packets
864    fn decrypt_and_validate(&mut self, packets: Vec<Frame>) -> Vec<Frame> {
865        let span = tracing::span!(
866            tracing::Level::DEBUG,
867            "decrypt_and_validate",
868            node_id = self.node_id().0,
869            packet_count = packets.len()
870        );
871        let _enter = span.enter();
872
873        let Some(processor) = self.secure_processor.as_mut() else {
874            tracing::debug!("No secure processor, skipping decryption");
875            return packets;
876        };
877
878        let initial_count = packets.len();
879        let validated: Vec<Frame> = packets
880            .into_iter()
881            .filter_map(|frame| {
882                let data = frame.serialize().ok()?;
883                let decrypted = processor.decrypt_frame(&data).ok()?;
884                let auth_tag = [0u8; AUTH_TAG_SIZE];
885                Some(Frame {
886                    header: decrypted.header,
887                    extensions: decrypted.extensions,
888                    payload: decrypted.payload,
889                    auth_tag,
890                })
891            })
892            .collect();
893
894        let failed_count = initial_count - validated.len();
895        if failed_count > 0 {
896            tracing::warn!(
897                node_id = self.node_id().0,
898                failed_count = failed_count,
899                validated_count = validated.len(),
900                "Some frames failed decryption/validation"
901            );
902
903            // Update metrics: track failed connections/decryption attempts
904            if let Some(ref metrics) = self.metrics {
905                metrics.failed_connections.inc_by(failed_count as u64);
906            }
907        }
908
909        tracing::debug!(
910            validated_count = validated.len(),
911            failed_count = failed_count,
912            "Decryption and validation complete"
913        );
914        validated
915    }
916
917    /// Stage 4: Extract events from validated packets
918    fn classify_events(&mut self, packets: Vec<Frame>) -> Vec<Event> {
919        let span = tracing::span!(
920            tracing::Level::DEBUG,
921            "classify_events",
922            node_id = self.node_id().0,
923            packet_count = packets.len()
924        );
925        let _enter = span.enter();
926
927        let mut events = Vec::new();
928
929        for frame in packets {
930            let source = frame.header.node_id;
931            let time_hint = frame.header.time_hint;
932            let packet_class = frame.header.class;
933            
934            // Track message size
935            if let Some(ref metrics) = self.metrics {
936                metrics.message_size_bytes.observe(frame.payload.len() as f64);
937            }
938
939            let frame_events = self.decode_event_blocks(&frame.payload, source, time_hint);
940            tracing::trace!(
941                source = source.0,
942                event_count = frame_events.len(),
943                packet_class = ?packet_class,
944                "Decoded events from frame"
945            );
946            
947            for event in &frame_events {
948                self.handle_event_side_effects(event);
949            }
950            events.extend(frame_events);
951        }
952
953        tracing::debug!(event_count = events.len(), "Event classification complete");
954        events
955    }
956
957    fn handle_event_side_effects(&mut self, event: &Event) {
958        match event.event_type {
959            EventType::StreamStart => {
960                let stream_id = event.target_state.instance();
961                let started_at = event.absolute_time(self.time_engine.tau_s());
962
963                if let MutationOp::Set(data) = &event.mutation {
964                    self.stream_metadata.insert(
965                        stream_id,
966                        StreamMetadata {
967                            source: event.source,
968                            started_at,
969                            data: data.clone(),
970                        },
971                    );
972                }
973
974                let field = self.state_engine_mut().field_mut();
975                if field.get(event.target_state).is_none() {
976                    field.create_atom(
977                        event.target_state,
978                        elara_core::StateType::Core,
979                        event.source,
980                    );
981                }
982
983                let visual_state = stream_visual_state_id(stream_id);
984                if field.get(visual_state).is_none() {
985                    field.create_atom(
986                        visual_state,
987                        elara_core::StateType::Perceptual,
988                        event.source,
989                    );
990                }
991            }
992            EventType::StreamEnd => {
993                let stream_id = event.target_state.instance();
994                self.stream_metadata.remove(&stream_id);
995                self.stream_visual_buffers.remove(&stream_id);
996                self.stream_visual_predictors.remove(&stream_id);
997
998                let field = self.state_engine_mut().field_mut();
999                field.remove(livestream_state_id(stream_id));
1000                field.remove(stream_visual_state_id(stream_id));
1001            }
1002            EventType::VisualKeyframe | EventType::VisualDelta => {
1003                if let MutationOp::Set(data) = &event.mutation {
1004                    if let Ok(state) = VisualEncoder::decode(data) {
1005                        let stream_id = if self
1006                            .stream_metadata
1007                            .contains_key(&event.target_state.instance())
1008                        {
1009                            Some(event.target_state.instance())
1010                        } else {
1011                            None
1012                        };
1013                        self.update_visual_state(state, stream_id);
1014                    }
1015                }
1016            }
1017            _ => {}
1018        }
1019    }
1020
1021    /// Stage 5: Update time model from events
1022    fn update_time_model(&mut self, events: &[Event]) {
1023        let span = tracing::span!(
1024            tracing::Level::DEBUG,
1025            "update_time_model",
1026            node_id = self.node_id().0,
1027            event_count = events.len()
1028        );
1029        let _enter = span.enter();
1030
1031        let reference = self.time_engine.tau_s();
1032        for event in events {
1033            let remote_time = event.time_intent.to_absolute(reference);
1034            let seq = (event.id.seq & 0xFFFF) as u16;
1035            self.time_engine
1036                .update_from_packet(event.source, remote_time, seq);
1037        }
1038
1039        tracing::debug!("Time model updated");
1040    }
1041
1042    /// Stage 7: Generate predictions for missing state
1043    fn generate_predictions(&mut self) {
1044        let dt_us = self.config.tick_interval.as_micros() as u64;
1045        {
1046            let field = self.state_engine.field_mut();
1047            for atom in field.atoms.values_mut() {
1048                atom.entropy.time_since_actual =
1049                    atom.entropy.time_since_actual.saturating_add(dt_us);
1050            }
1051        }
1052
1053        let needs_prediction = self.state_engine.field().atoms_needing_prediction(100);
1054        if needs_prediction.is_empty() {
1055            return;
1056        }
1057
1058        let field = self.state_engine.field_mut();
1059        for state_id in needs_prediction {
1060            if let Some(atom) = field.atoms.get_mut(&state_id) {
1061                let increase = match atom.state_type {
1062                    elara_core::StateType::Core => 0.01,
1063                    elara_core::StateType::Perceptual => 0.03,
1064                    elara_core::StateType::Enhancement => 0.05,
1065                    elara_core::StateType::Cosmetic => 0.07,
1066                };
1067                atom.entropy.increase(increase);
1068            }
1069        }
1070    }
1071
1072    /// Stage 10: Authorize and sign local events
1073    fn authorize_and_sign(&mut self) -> Vec<Event> {
1074        let span = tracing::span!(
1075            tracing::Level::DEBUG,
1076            "authorize_and_sign",
1077            node_id = self.node_id().0,
1078            event_count = self.local_events.len()
1079        );
1080        let _enter = span.enter();
1081
1082        let events: Vec<Event> = self.local_events.drain(..).collect();
1083        self.stats.events_signed += events.len() as u64;
1084
1085        let signed_events = events
1086            .into_iter()
1087            .map(|mut event| {
1088                // Sign event
1089                let signature = self.identity.sign(&event.mutation.encode());
1090                event.authority_proof.signature = signature;
1091                event
1092            })
1093            .collect();
1094
1095        tracing::debug!("Events authorized and signed");
1096        signed_events
1097    }
1098
1099    /// Stage 11: Build packets from authorized events
1100    fn build_packets(&mut self, _events: Vec<Event>) {
1101        let span = tracing::span!(
1102            tracing::Level::DEBUG,
1103            "build_packets",
1104            node_id = self.node_id().0,
1105            event_count = _events.len()
1106        );
1107        let _enter = span.enter();
1108
1109        let Some(processor) = self.secure_processor.as_mut() else {
1110            self.build_plain_packets(_events);
1111            return;
1112        };
1113
1114        let mut packets_built = 0;
1115        for event in _events {
1116            if self.outgoing.len() >= self.config.max_outgoing_buffer {
1117                // Buffer full - drop message
1118                if let Some(ref metrics) = self.metrics {
1119                    metrics.messages_dropped.inc();
1120                }
1121                tracing::warn!("Outgoing buffer full, dropping messages");
1122                break;
1123            }
1124
1125            let class = Self::class_for_event(&event);
1126            let profile = Self::profile_for_event(&event);
1127            let time_hint = event.time_intent.ts_offset();
1128            let payload = Self::encode_event_block(&event);
1129
1130            if let Ok(bytes) =
1131                processor.encrypt_frame(class, profile, time_hint, Extensions::new(), &payload)
1132            {
1133                if let Ok(frame) = Frame::parse(&bytes) {
1134                    self.outgoing.push_back(frame);
1135                    packets_built += 1;
1136                    
1137                    // Update metrics: increment messages_sent
1138                    if let Some(ref metrics) = self.metrics {
1139                        metrics.messages_sent.inc();
1140                        metrics.message_size_bytes.observe(bytes.len() as f64);
1141                    }
1142                }
1143            }
1144        }
1145
1146        tracing::debug!(packets_built = packets_built, "Packets built");
1147    }
1148
1149    fn build_plain_packets(&mut self, events: Vec<Event>) {
1150        let span = tracing::span!(
1151            tracing::Level::DEBUG,
1152            "build_plain_packets",
1153            node_id = self.node_id().0,
1154            event_count = events.len()
1155        );
1156        let _enter = span.enter();
1157
1158        let mut packets_built = 0;
1159        for event in events {
1160            if self.outgoing.len() >= self.config.max_outgoing_buffer {
1161                // Buffer full - drop message
1162                if let Some(ref metrics) = self.metrics {
1163                    metrics.messages_dropped.inc();
1164                }
1165                tracing::warn!("Outgoing buffer full, dropping messages");
1166                break;
1167            }
1168
1169            let class = Self::class_for_event(&event);
1170            let profile = Self::profile_for_event(&event);
1171            let time_hint = event.time_intent.ts_offset();
1172            let payload = Self::encode_event_block(&event);
1173
1174            let session_id = self.session_id.unwrap_or(SessionId::ZERO);
1175            let mut header = FixedHeader::new(session_id, self.node_id());
1176            header.class = class;
1177            header.profile = profile;
1178            header.time_hint = time_hint;
1179
1180            let frame = FrameBuilder::new(header).payload(payload.clone()).build();
1181            self.outgoing.push_back(frame);
1182            packets_built += 1;
1183
1184            // Update metrics: increment messages_sent
1185            if let Some(ref metrics) = self.metrics {
1186                metrics.messages_sent.inc();
1187                metrics.message_size_bytes.observe(payload.len() as f64);
1188            }
1189        }
1190
1191        tracing::debug!(packets_built = packets_built, "Plain packets built");
1192    }
1193
1194    fn decode_event_blocks(&self, payload: &[u8], source: NodeId, time_hint: i32) -> Vec<Event> {
1195        let mut events = Vec::new();
1196        let mut offset = 0;
1197
1198        while payload.len().saturating_sub(offset) >= 13 {
1199            let event_type = match EventType::from_byte(payload[offset]) {
1200                Some(t) => t,
1201                None => break,
1202            };
1203            offset += 1;
1204
1205            let state_end = offset + 8;
1206            if state_end > payload.len() {
1207                break;
1208            }
1209            let state_id = match payload[offset..state_end].try_into() {
1210                Ok(bytes) => StateId::from_bytes(bytes),
1211                Err(_) => break,
1212            };
1213            offset = state_end;
1214
1215            let version_len_end = offset + 2;
1216            if version_len_end > payload.len() {
1217                break;
1218            }
1219            let version_len = match payload[offset..version_len_end].try_into() {
1220                Ok(bytes) => u16::from_le_bytes(bytes) as usize,
1221                Err(_) => break,
1222            };
1223            offset = version_len_end;
1224
1225            let version_end = offset + version_len;
1226            if version_end > payload.len() {
1227                break;
1228            }
1229            let version_ref = match Self::decode_version_vector(&payload[offset..version_end]) {
1230                Some(v) => v,
1231                None => break,
1232            };
1233            offset = version_end;
1234
1235            let delta_len_end = offset + 2;
1236            if delta_len_end > payload.len() {
1237                break;
1238            }
1239            let delta_len = match payload[offset..delta_len_end].try_into() {
1240                Ok(bytes) => u16::from_le_bytes(bytes) as usize,
1241                Err(_) => break,
1242            };
1243            offset = delta_len_end;
1244
1245            let delta_end = offset + delta_len;
1246            if delta_end > payload.len() {
1247                break;
1248            }
1249            let delta = &payload[offset..delta_end];
1250            let (mutation, used) = match MutationOp::decode(delta) {
1251                Some(decoded) => decoded,
1252                None => break,
1253            };
1254            if used != delta_len {
1255                break;
1256            }
1257            offset = delta_end;
1258
1259            let seq = version_ref.get(source).saturating_add(1);
1260            let event = Event::new(source, seq, event_type, state_id, mutation)
1261                .with_version(version_ref)
1262                .with_time_intent(TimeIntent::new(time_hint));
1263            events.push(event);
1264        }
1265
1266        events
1267    }
1268
1269    fn encode_event_block(event: &Event) -> Vec<u8> {
1270        let mut buf = Vec::new();
1271        buf.push(event.event_type.to_byte());
1272        buf.extend_from_slice(&event.target_state.to_bytes());
1273
1274        let version = Self::encode_version_vector(&event.version_ref);
1275        buf.extend_from_slice(&(version.len() as u16).to_le_bytes());
1276        buf.extend_from_slice(&version);
1277
1278        let delta = event.mutation.encode();
1279        buf.extend_from_slice(&(delta.len() as u16).to_le_bytes());
1280        buf.extend_from_slice(&delta);
1281
1282        buf
1283    }
1284
1285    fn encode_version_vector(version: &VersionVector) -> Vec<u8> {
1286        let mut entries = version.to_compact();
1287        entries.sort_by_key(|(node, _)| node.0);
1288        let mut buf = Vec::with_capacity(entries.len() * 16);
1289        for (node, count) in entries {
1290            buf.extend_from_slice(&node.to_bytes());
1291            buf.extend_from_slice(&count.to_le_bytes());
1292        }
1293        buf
1294    }
1295
1296    fn decode_version_vector(buf: &[u8]) -> Option<VersionVector> {
1297        if !buf.len().is_multiple_of(16) {
1298            return None;
1299        }
1300        let mut entries = Vec::new();
1301        for chunk in buf.chunks_exact(16) {
1302            let node = match chunk[0..8].try_into() {
1303                Ok(bytes) => NodeId::from_bytes(bytes),
1304                Err(_) => return None,
1305            };
1306            let count = match chunk[8..16].try_into() {
1307                Ok(bytes) => u64::from_le_bytes(bytes),
1308                Err(_) => return None,
1309            };
1310            entries.push((node, count));
1311        }
1312        Some(VersionVector::from_compact(entries))
1313    }
1314
1315    fn class_for_event(event: &Event) -> PacketClass {
1316        match event.event_type {
1317            EventType::StateRequest | EventType::StateResponse | EventType::GapFill => {
1318                PacketClass::Repair
1319            }
1320            EventType::VoiceFrame | EventType::VoiceMute => PacketClass::Perceptual,
1321            EventType::TypingStart | EventType::TypingStop | EventType::PresenceUpdate => {
1322                PacketClass::Perceptual
1323            }
1324            EventType::VisualKeyframe | EventType::VisualDelta => PacketClass::Perceptual,
1325            EventType::TextAppend
1326            | EventType::TextEdit
1327            | EventType::TextDelete
1328            | EventType::TextReact => PacketClass::Core,
1329            EventType::FeedAppend | EventType::FeedDelete => PacketClass::Core,
1330            EventType::StreamStart | EventType::StreamEnd => PacketClass::Core,
1331            _ => PacketClass::Core,
1332        }
1333    }
1334
1335    fn profile_for_event(event: &Event) -> RepresentationProfile {
1336        match event.event_type {
1337            EventType::VoiceFrame | EventType::VoiceMute => RepresentationProfile::VoiceMinimal,
1338            EventType::VisualKeyframe | EventType::VisualDelta => {
1339                RepresentationProfile::VideoStandard
1340            }
1341            EventType::StreamStart | EventType::StreamEnd => {
1342                RepresentationProfile::StreamAsymmetric
1343            }
1344            _ => RepresentationProfile::Textual,
1345        }
1346    }
1347
1348    fn queue_visual_event(
1349        &mut self,
1350        target_state: StateId,
1351        state: &VisualState,
1352        event_type: EventType,
1353    ) {
1354        let seq = self.next_event_seq();
1355        let time_intent = self.time_intent_for(state.timestamp);
1356        let payload = VisualEncoder::encode(state);
1357        let event = Event::new(
1358            self.node_id(),
1359            seq,
1360            event_type,
1361            target_state,
1362            MutationOp::Set(payload),
1363        )
1364        .with_time_intent(time_intent);
1365        self.queue_local_event(event);
1366    }
1367
1368    fn time_intent_for(&self, timestamp: StateTime) -> TimeIntent {
1369        let reference = self.time_engine.tau_s();
1370        let offset = timestamp.to_wire_offset(reference);
1371        TimeIntent::new(offset)
1372    }
1373
1374    fn visual_buffer_config(&self) -> (usize, u32) {
1375        let stability = self.time_engine.stability_score().clamp(0.1, 1.0);
1376        let instability = 1.0 - stability;
1377        let buffer_size = (24.0 + instability * 40.0).round() as usize;
1378        let buffer_delay_ms = (40.0 + instability * 140.0).round() as u32;
1379        (buffer_size.clamp(16, 64), buffer_delay_ms.clamp(30, 200))
1380    }
1381
1382    fn visual_predictor_config(&self) -> PredictionConfig {
1383        let stability = self.time_engine.stability_score().clamp(0.1, 1.0);
1384        let instability = (1.0 - stability) as f32;
1385        PredictionConfig {
1386            max_horizon_ms: ((400.0 + instability as f64 * 800.0).round() as u32).clamp(300, 1200),
1387            confidence_decay: (0.08 + instability * 0.12).clamp(0.06, 0.2),
1388            min_confidence: (0.25 + instability * 0.2).clamp(0.2, 0.5),
1389            ..PredictionConfig::default()
1390        }
1391    }
1392
1393    fn update_visual_state(&mut self, state: VisualState, stream_id: Option<u64>) {
1394        let (buffer_size, buffer_delay_ms) = self.visual_buffer_config();
1395        let predictor_config = self.visual_predictor_config();
1396
1397        let buffer = self
1398            .visual_buffers
1399            .entry(state.source)
1400            .or_insert_with(|| VisualStateBuffer::new(buffer_size, buffer_delay_ms));
1401        buffer.push(state.clone());
1402        let predictor = self
1403            .visual_predictors
1404            .entry(state.source)
1405            .or_insert_with(|| VisualPredictor::new(predictor_config.clone()));
1406        predictor.update(state.clone());
1407
1408        if let Some(stream_id) = stream_id {
1409            let stream_buffer = self
1410                .stream_visual_buffers
1411                .entry(stream_id)
1412                .or_insert_with(|| VisualStateBuffer::new(buffer_size, buffer_delay_ms));
1413            stream_buffer.push(state.clone());
1414            let stream_predictor = self
1415                .stream_visual_predictors
1416                .entry(stream_id)
1417                .or_insert_with(|| VisualPredictor::new(predictor_config));
1418            stream_predictor.update(state);
1419        }
1420    }
1421
1422    pub fn visual_state(&self, node_id: NodeId) -> Option<VisualState> {
1423        let atom = self.state_engine.field().get(visual_state_id(node_id))?;
1424        VisualEncoder::decode(&atom.value).ok()
1425    }
1426
1427    pub fn visual_state_at(
1428        &mut self,
1429        node_id: NodeId,
1430        target_time: StateTime,
1431    ) -> Option<VisualState> {
1432        let state = self
1433            .visual_buffers
1434            .get(&node_id)
1435            .and_then(|buffer| buffer.get_at(target_time));
1436        if state.is_some() {
1437            return state;
1438        }
1439        self.visual_predictors
1440            .get_mut(&node_id)
1441            .and_then(|predictor| predictor.predict(target_time))
1442    }
1443
1444    pub fn visual_state_now(&mut self, node_id: NodeId) -> Option<VisualState> {
1445        let now = self.time_engine.tau_s();
1446        self.visual_state_at(node_id, now)
1447    }
1448
1449    pub fn stream_visual_state(&self, stream_id: u64) -> Option<VisualState> {
1450        let atom = self
1451            .state_engine
1452            .field()
1453            .get(stream_visual_state_id(stream_id))?;
1454        VisualEncoder::decode(&atom.value).ok()
1455    }
1456
1457    pub fn stream_visual_state_at(
1458        &mut self,
1459        stream_id: u64,
1460        target_time: StateTime,
1461    ) -> Option<VisualState> {
1462        let state = self
1463            .stream_visual_buffers
1464            .get(&stream_id)
1465            .and_then(|buffer| buffer.get_at(target_time));
1466        if state.is_some() {
1467            return state;
1468        }
1469        self.stream_visual_predictors
1470            .get_mut(&stream_id)
1471            .and_then(|predictor| predictor.predict(target_time))
1472    }
1473
1474    pub fn stream_visual_state_now(&mut self, stream_id: u64) -> Option<VisualState> {
1475        let now = self.time_engine.tau_s();
1476        self.stream_visual_state_at(stream_id, now)
1477    }
1478
1479    pub fn feed_stream(&self, feed_state: StateId) -> FeedStream {
1480        self.state_engine
1481            .field()
1482            .get(feed_state)
1483            .map(|atom| FeedStream::from_bytes(&atom.value))
1484            .unwrap_or_default()
1485    }
1486
1487    pub fn stream_metadata(&self, stream_id: u64) -> Option<&StreamMetadata> {
1488        self.stream_metadata.get(&stream_id)
1489    }
1490
1491    /// Get reference to time engine
1492    pub fn time_engine(&self) -> &TimeEngine {
1493        &self.time_engine
1494    }
1495
1496    /// Get reference to state engine
1497    pub fn state_engine(&self) -> &ReconciliationEngine {
1498        &self.state_engine
1499    }
1500
1501    /// Get mutable reference to state engine
1502    pub fn state_engine_mut(&mut self) -> &mut ReconciliationEngine {
1503        &mut self.state_engine
1504    }
1505
1506    pub fn stats(&self) -> &RuntimeStats {
1507        &self.stats
1508    }
1509
1510    /// Check if node is in a session
1511    pub fn in_session(&self) -> bool {
1512        self.session_id.is_some()
1513    }
1514
1515    /// Get next event sequence number
1516    pub fn next_event_seq(&mut self) -> u64 {
1517        let seq = self.event_seq;
1518        self.event_seq += 1;
1519        seq
1520    }
1521}
1522
1523impl Default for Node {
1524    fn default() -> Self {
1525        Self::new()
1526    }
1527}
1528
1529#[cfg(test)]
1530mod tests {
1531    use super::*;
1532    use elara_core::{PacketClass, RepresentationProfile};
1533    use elara_msp::text::{feed_stream_id as feed_id, FeedItem as MspFeedItem};
1534
1535    #[test]
1536    fn test_node_creation() {
1537        let node = Node::new();
1538        assert!(!node.in_session());
1539    }
1540
1541    #[test]
1542    fn test_node_tick() {
1543        let mut node = Node::new();
1544
1545        // Should not panic
1546        node.tick();
1547        node.tick();
1548        node.tick();
1549    }
1550
1551    #[test]
1552    fn test_local_event_buffer_limit() {
1553        let config = NodeConfig {
1554            max_local_events: 1,
1555            ..Default::default()
1556        };
1557        let mut node = Node::with_config(config);
1558
1559        let event_a = Event::new(
1560            NodeId::new(1),
1561            1,
1562            EventType::TextAppend,
1563            StateId::new(1),
1564            MutationOp::Append(b"a".to_vec()),
1565        );
1566        let event_b = Event::new(
1567            NodeId::new(1),
1568            2,
1569            EventType::TextAppend,
1570            StateId::new(1),
1571            MutationOp::Append(b"b".to_vec()),
1572        );
1573
1574        node.queue_local_event(event_a);
1575        node.queue_local_event(event_b);
1576
1577        assert_eq!(node.stats().local_events_queued, 1);
1578    }
1579
1580    #[test]
1581    fn test_prediction_entropy_advances() {
1582        let mut node = Node::new();
1583        let state_id = StateId::new(42);
1584        let node_id = node.node_id();
1585
1586        {
1587            let field = node.state_engine_mut().field_mut();
1588            let atom = field.create_atom(state_id, elara_core::StateType::Core, node_id);
1589            atom.entropy.time_since_actual = 0;
1590        }
1591
1592        node.tick();
1593
1594        let field = node.state_engine().field();
1595        let atom = field.get(state_id).unwrap();
1596        assert!(atom.entropy.time_since_actual > 0);
1597    }
1598
1599    #[test]
1600    fn test_node_session() {
1601        let mut node = Node::new();
1602        let session_id = SessionId::new(12345);
1603        let session_key = [0x42u8; 32];
1604
1605        node.join_session(session_id, session_key);
1606        assert!(node.in_session());
1607        assert_eq!(node.session_id(), Some(session_id));
1608
1609        node.leave_session();
1610        assert!(!node.in_session());
1611    }
1612
1613    fn build_payload(event_type: EventType, state_id: StateId, mutation: MutationOp) -> Vec<u8> {
1614        let mut buf = Vec::new();
1615        buf.push(event_type.to_byte());
1616        buf.extend_from_slice(&state_id.to_bytes());
1617
1618        // Empty version vector
1619        buf.extend_from_slice(&(0u16).to_le_bytes());
1620
1621        let delta = mutation.encode();
1622        buf.extend_from_slice(&(delta.len() as u16).to_le_bytes());
1623        buf.extend_from_slice(&delta);
1624
1625        buf
1626    }
1627
1628    fn incoming_frame_for(
1629        session: SessionId,
1630        source: NodeId,
1631        class: PacketClass,
1632        profile: RepresentationProfile,
1633        time_hint: i32,
1634        payload: Vec<u8>,
1635    ) -> Frame {
1636        let mut header = FixedHeader::new(session, source);
1637        header.class = class;
1638        header.profile = profile;
1639        header.time_hint = time_hint;
1640        FrameBuilder::new(header).payload(payload).build()
1641    }
1642
1643    #[test]
1644    fn test_stream_start_side_effects_on_incoming_frame() {
1645        let mut node = Node::new();
1646        let stream_id = 42u64;
1647        let target_state = livestream_state_id(stream_id);
1648
1649        let payload = build_payload(
1650            EventType::StreamStart,
1651            target_state,
1652            MutationOp::Set(vec![1, 2, 3]),
1653        );
1654
1655        let frame = incoming_frame_for(
1656            SessionId::new(1),
1657            NodeId::new(9000),
1658            PacketClass::Core,
1659            RepresentationProfile::StreamAsymmetric,
1660            0,
1661            payload,
1662        );
1663
1664        node.queue_incoming(frame);
1665        node.tick();
1666
1667        let field = node.state_engine().field();
1668        assert!(field.contains(livestream_state_id(stream_id)));
1669        assert!(field.contains(stream_visual_state_id(stream_id)));
1670        assert!(node.stream_metadata(stream_id).is_some());
1671    }
1672
1673    #[test]
1674    fn test_stream_end_removes_atoms() {
1675        let mut node = Node::new();
1676        let stream_id = 99u64;
1677
1678        // Start
1679        let start_payload = build_payload(
1680            EventType::StreamStart,
1681            livestream_state_id(stream_id),
1682            MutationOp::Set(vec![9, 9, 9]),
1683        );
1684        let start_frame = incoming_frame_for(
1685            SessionId::new(1),
1686            NodeId::new(7000),
1687            PacketClass::Core,
1688            RepresentationProfile::StreamAsymmetric,
1689            0,
1690            start_payload,
1691        );
1692        node.queue_incoming(start_frame);
1693        node.tick();
1694
1695        // End
1696        let end_payload = build_payload(
1697            EventType::StreamEnd,
1698            livestream_state_id(stream_id),
1699            MutationOp::Delete,
1700        );
1701        let end_frame = incoming_frame_for(
1702            SessionId::new(1),
1703            NodeId::new(7000),
1704            PacketClass::Core,
1705            RepresentationProfile::StreamAsymmetric,
1706            0,
1707            end_payload,
1708        );
1709        node.queue_incoming(end_frame);
1710        node.tick();
1711
1712        let field = node.state_engine().field();
1713        assert!(!field.contains(livestream_state_id(stream_id)));
1714        assert!(!field.contains(stream_visual_state_id(stream_id)));
1715        assert!(node.stream_metadata(stream_id).is_none());
1716    }
1717
1718    #[test]
1719    fn test_feed_append_roundtrip() {
1720        let mut node = Node::new();
1721        let feed_state = feed_id(7);
1722
1723        // Build a feed item and encode via MSP
1724        let item = MspFeedItem::new(
1725            elara_core::MessageId(1),
1726            NodeId::new(123),
1727            b"hello feed".to_vec(),
1728            elara_core::StateTime::from_millis(0),
1729        );
1730        let encoded_item = item.encode();
1731
1732        let payload = build_payload(
1733            EventType::FeedAppend,
1734            feed_state,
1735            MutationOp::Append(encoded_item),
1736        );
1737        let frame = incoming_frame_for(
1738            SessionId::new(1),
1739            NodeId::new(5000),
1740            PacketClass::Core,
1741            RepresentationProfile::Textual,
1742            0,
1743            payload,
1744        );
1745
1746        node.queue_incoming(frame);
1747        node.tick();
1748
1749        let stream = node.feed_stream(feed_state);
1750        assert_eq!(stream.items.len(), 1);
1751        let first = &stream.items[0];
1752        assert_eq!(first.id.0, 1);
1753        assert_eq!(first.author, NodeId::new(123));
1754        assert_eq!(first.content, b"hello feed".to_vec());
1755        assert!(!first.deleted);
1756    }
1757}