Skip to main content

elara_runtime/
node.rs

1//! ELARA Node - Runtime loop implementation
2
3use std::collections::{HashMap, VecDeque};
4use std::time::{Duration, Instant};
5
6use elara_core::{
7    Event, EventType, MessageId, MutationOp, NodeId, PacketClass, RepresentationProfile, SessionId,
8    StateId, StateTime, TimeIntent, VersionVector,
9};
10use elara_crypto::{Identity, SecureFrameProcessor};
11use elara_state::ReconciliationEngine;
12use elara_time::TimeEngine;
13use elara_visual::{
14    livestream_state_id, stream_visual_state_id, visual_state_id, PredictionConfig, VisualEncoder,
15    VisualPredictor, VisualState, VisualStateBuffer,
16};
17use elara_wire::{Extensions, FixedHeader, Frame, FrameBuilder, AUTH_TAG_SIZE};
18
19/// ELARA Node configuration
20#[derive(Clone, Debug)]
21pub struct NodeConfig {
22    /// Tick interval
23    pub tick_interval: Duration,
24    /// Maximum incoming packet buffer
25    pub max_packet_buffer: usize,
26    /// Maximum outgoing packet buffer
27    pub max_outgoing_buffer: usize,
28    pub max_local_events: usize,
29}
30
31#[derive(Clone, Debug, Default)]
32pub struct RuntimeStats {
33    pub ticks: u64,
34    pub incoming_queued: u64,
35    pub outgoing_popped: u64,
36    pub local_events_queued: u64,
37    pub events_signed: u64,
38    pub packets_in: u64,
39    pub packets_out: u64,
40    pub last_tick_duration: Duration,
41}
42
43#[derive(Clone, Debug)]
44pub struct FeedItem {
45    pub id: MessageId,
46    pub author: NodeId,
47    pub content: Vec<u8>,
48    pub timestamp: StateTime,
49    pub deleted: bool,
50}
51
52#[derive(Clone, Debug, Default)]
53pub struct FeedStream {
54    pub items: Vec<FeedItem>,
55}
56
57#[derive(Clone, Debug)]
58pub struct StreamMetadata {
59    pub source: NodeId,
60    pub started_at: StateTime,
61    pub data: Vec<u8>,
62}
63
64impl FeedItem {
65    fn decode(buf: &[u8]) -> Option<(Self, usize)> {
66        if buf.len() < 27 {
67            return None;
68        }
69
70        let id = MessageId(u64::from_le_bytes(buf[0..8].try_into().ok()?));
71        let author = NodeId::from_bytes(buf[8..16].try_into().ok()?);
72        let timestamp = StateTime::from_micros(i64::from_le_bytes(buf[16..24].try_into().ok()?));
73        let deleted = buf[24] != 0;
74        let mut offset = 25;
75
76        if buf.len() < offset + 2 {
77            return None;
78        }
79        let content_len = u16::from_le_bytes(buf[offset..offset + 2].try_into().ok()?) as usize;
80        offset += 2;
81
82        if buf.len() < offset + content_len {
83            return None;
84        }
85        let content = buf[offset..offset + content_len].to_vec();
86        offset += content_len;
87
88        Some((
89            FeedItem {
90                id,
91                author,
92                content,
93                timestamp,
94                deleted,
95            },
96            offset,
97        ))
98    }
99}
100
101impl FeedStream {
102    fn from_bytes(data: &[u8]) -> Self {
103        let mut stream = FeedStream::default();
104        let mut offset = 0;
105        while offset < data.len() {
106            let Some((item, used)) = FeedItem::decode(&data[offset..]) else {
107                break;
108            };
109            stream.apply_item(item);
110            offset += used;
111        }
112        stream
113    }
114
115    fn apply_item(&mut self, item: FeedItem) {
116        if let Some(existing) = self.items.iter_mut().find(|m| m.id == item.id) {
117            existing.deleted = item.deleted;
118            existing.content = item.content;
119            existing.timestamp = item.timestamp;
120            existing.author = item.author;
121            return;
122        }
123
124        let pos = self
125            .items
126            .binary_search_by(|m| m.timestamp.cmp(&item.timestamp))
127            .unwrap_or_else(|p| p);
128        self.items.insert(pos, item);
129    }
130}
131
132impl Default for NodeConfig {
133    fn default() -> Self {
134        NodeConfig {
135            tick_interval: Duration::from_millis(10),
136            max_packet_buffer: 1000,
137            max_outgoing_buffer: 1000,
138            max_local_events: 1000,
139        }
140    }
141}
142
143/// ELARA Node - the runtime entity
144pub struct Node {
145    /// Node identity
146    identity: Identity,
147    /// Current session (if any)
148    session_id: Option<SessionId>,
149    /// Time engine
150    time_engine: TimeEngine,
151    /// State reconciliation engine
152    state_engine: ReconciliationEngine,
153    secure_processor: Option<SecureFrameProcessor>,
154    /// Incoming packet buffer
155    incoming: VecDeque<Frame>,
156    /// Outgoing packet buffer
157    outgoing: VecDeque<Frame>,
158    /// Local events to send
159    local_events: Vec<Event>,
160    /// Event sequence counter
161    event_seq: u64,
162    /// Configuration
163    config: NodeConfig,
164    stats: RuntimeStats,
165    stream_metadata: HashMap<u64, StreamMetadata>,
166    visual_buffers: HashMap<NodeId, VisualStateBuffer>,
167    visual_predictors: HashMap<NodeId, VisualPredictor>,
168    stream_visual_buffers: HashMap<u64, VisualStateBuffer>,
169    stream_visual_predictors: HashMap<u64, VisualPredictor>,
170}
171
172impl Node {
173    /// Create a new node with generated identity
174    pub fn new() -> Self {
175        Self::with_config(NodeConfig::default())
176    }
177
178    /// Create a new node with custom configuration
179    pub fn with_config(config: NodeConfig) -> Self {
180        Node {
181            identity: Identity::generate(),
182            session_id: None,
183            time_engine: TimeEngine::new(),
184            state_engine: ReconciliationEngine::new(),
185            secure_processor: None,
186            incoming: VecDeque::new(),
187            outgoing: VecDeque::new(),
188            local_events: Vec::new(),
189            event_seq: 0,
190            config,
191            stats: RuntimeStats::default(),
192            stream_metadata: HashMap::new(),
193            visual_buffers: HashMap::new(),
194            visual_predictors: HashMap::new(),
195            stream_visual_buffers: HashMap::new(),
196            stream_visual_predictors: HashMap::new(),
197        }
198    }
199
200    pub fn with_identity(identity: Identity, config: NodeConfig) -> Self {
201        Node {
202            identity,
203            session_id: None,
204            time_engine: TimeEngine::new(),
205            state_engine: ReconciliationEngine::new(),
206            secure_processor: None,
207            incoming: VecDeque::new(),
208            outgoing: VecDeque::new(),
209            local_events: Vec::new(),
210            event_seq: 0,
211            config,
212            stats: RuntimeStats::default(),
213            stream_metadata: HashMap::new(),
214            visual_buffers: HashMap::new(),
215            visual_predictors: HashMap::new(),
216            stream_visual_buffers: HashMap::new(),
217            stream_visual_predictors: HashMap::new(),
218        }
219    }
220
221    /// Get node ID
222    pub fn node_id(&self) -> NodeId {
223        self.identity.node_id()
224    }
225
226    /// Get session ID (if in session)
227    pub fn session_id(&self) -> Option<SessionId> {
228        self.session_id
229    }
230
231    /// Join a session
232    pub fn join_session(&mut self, session_id: SessionId, session_key: [u8; 32]) {
233        self.session_id = Some(session_id);
234        self.secure_processor = Some(SecureFrameProcessor::new(
235            session_id,
236            self.node_id(),
237            session_key,
238        ));
239    }
240
241    pub fn join_session_unsecured(&mut self, session_id: SessionId) {
242        self.session_id = Some(session_id);
243        self.secure_processor = None;
244    }
245
246    /// Leave current session
247    pub fn leave_session(&mut self) {
248        self.session_id = None;
249        self.secure_processor = None;
250    }
251
252    /// Queue an incoming frame for processing
253    pub fn queue_incoming(&mut self, frame: Frame) {
254        if self.incoming.len() < self.config.max_packet_buffer {
255            self.incoming.push_back(frame);
256            self.stats.incoming_queued += 1;
257        }
258    }
259
260    /// Get next outgoing frame (if any)
261    pub fn pop_outgoing(&mut self) -> Option<Frame> {
262        let frame = self.outgoing.pop_front();
263        if frame.is_some() {
264            self.stats.outgoing_popped += 1;
265            self.stats.packets_out += 1;
266        }
267        frame
268    }
269
270    /// Queue a local event to send
271    pub fn queue_local_event(&mut self, event: Event) {
272        if self.local_events.len() < self.config.max_local_events {
273            self.local_events.push(event);
274            self.stats.local_events_queued += 1;
275        }
276    }
277
278    pub fn queue_visual_keyframe(&mut self, state: &VisualState) {
279        self.queue_visual_event(
280            visual_state_id(state.source),
281            state,
282            EventType::VisualKeyframe,
283        );
284    }
285
286    pub fn queue_visual_delta(&mut self, state: &VisualState) {
287        self.queue_visual_event(visual_state_id(state.source), state, EventType::VisualDelta);
288    }
289
290    pub fn queue_stream_visual_keyframe(&mut self, stream_id: u64, state: &VisualState) {
291        self.queue_visual_event(
292            stream_visual_state_id(stream_id),
293            state,
294            EventType::VisualKeyframe,
295        );
296    }
297
298    pub fn queue_stream_visual_delta(&mut self, stream_id: u64, state: &VisualState) {
299        self.queue_visual_event(
300            stream_visual_state_id(stream_id),
301            state,
302            EventType::VisualDelta,
303        );
304    }
305
306    pub fn queue_stream_start(&mut self, stream_id: u64, metadata: Vec<u8>, timestamp: StateTime) {
307        self.stream_metadata.insert(
308            stream_id,
309            StreamMetadata {
310                source: self.node_id(),
311                started_at: timestamp,
312                data: metadata.clone(),
313            },
314        );
315        let target_state = livestream_state_id(stream_id);
316        let seq = self.next_event_seq();
317        let time_intent = self.time_intent_for(timestamp);
318        let event = Event::new(
319            self.node_id(),
320            seq,
321            EventType::StreamStart,
322            target_state,
323            MutationOp::Set(metadata),
324        )
325        .with_time_intent(time_intent);
326        self.queue_local_event(event);
327    }
328
329    pub fn queue_stream_end(&mut self, stream_id: u64, timestamp: StateTime) {
330        self.stream_metadata.remove(&stream_id);
331        self.stream_visual_buffers.remove(&stream_id);
332        self.stream_visual_predictors.remove(&stream_id);
333        let target_state = livestream_state_id(stream_id);
334        let seq = self.next_event_seq();
335        let time_intent = self.time_intent_for(timestamp);
336        let event = Event::new(
337            self.node_id(),
338            seq,
339            EventType::StreamEnd,
340            target_state,
341            MutationOp::Delete,
342        )
343        .with_time_intent(time_intent);
344        self.queue_local_event(event);
345
346        let visual_state = stream_visual_state_id(stream_id);
347        let seq = self.next_event_seq();
348        let event = Event::new(
349            self.node_id(),
350            seq,
351            EventType::StreamEnd,
352            visual_state,
353            MutationOp::Delete,
354        )
355        .with_time_intent(time_intent);
356        self.queue_local_event(event);
357    }
358
359    pub fn queue_feed_append(&mut self, feed_state: StateId, data: Vec<u8>, timestamp: StateTime) {
360        let seq = self.next_event_seq();
361        let time_intent = self.time_intent_for(timestamp);
362        let event = Event::new(
363            self.node_id(),
364            seq,
365            EventType::FeedAppend,
366            feed_state,
367            MutationOp::Append(data),
368        )
369        .with_time_intent(time_intent);
370        self.queue_local_event(event);
371    }
372
373    pub fn queue_feed_delete(&mut self, feed_state: StateId, timestamp: StateTime) {
374        let seq = self.next_event_seq();
375        let time_intent = self.time_intent_for(timestamp);
376        let event = Event::new(
377            self.node_id(),
378            seq,
379            EventType::FeedDelete,
380            feed_state,
381            MutationOp::Delete,
382        )
383        .with_time_intent(time_intent);
384        self.queue_local_event(event);
385    }
386
387    /// Execute one tick of the runtime loop
388    /// This is the core 12-stage loop
389    pub fn tick(&mut self) {
390        let start = Instant::now();
391        self.stats.ticks += 1;
392
393        // Stage 1: Advance clocks (τp, τs) - NEVER SKIP
394        self.time_engine.tick();
395
396        // Stage 2: Ingest packets
397        let packets = self.ingest_packets();
398
399        // Stage 3: Decrypt and validate
400        let validated = self.decrypt_and_validate(packets);
401
402        // Stage 4: Classify events
403        let events = self.classify_events(validated);
404
405        // Stage 5: Update time model
406        self.update_time_model(&events);
407
408        // Stage 6: Reconcile state
409        let _result = self.state_engine.process_events(events, &self.time_engine);
410        self.state_engine.control_divergence();
411
412        // Stage 7: Generate predictions
413        self.generate_predictions();
414
415        // Stage 8: Project to representation (handled externally)
416        // Stage 9: Collect local events (already queued)
417
418        // Stage 10: Authorize and sign
419        let authorized = self.authorize_and_sign();
420
421        // Stage 11: Build packets
422        self.build_packets(authorized);
423
424        // Stage 12: Schedule transmission (handled externally via pop_outgoing)
425
426        self.stats.last_tick_duration = start.elapsed();
427    }
428
429    /// Stage 2: Ingest packets from buffer
430    fn ingest_packets(&mut self) -> Vec<Frame> {
431        let packets: Vec<Frame> = self.incoming.drain(..).collect();
432        self.stats.packets_in += packets.len() as u64;
433        packets
434    }
435
436    /// Stage 3: Decrypt and validate packets
437    fn decrypt_and_validate(&mut self, packets: Vec<Frame>) -> Vec<Frame> {
438        let Some(processor) = self.secure_processor.as_mut() else {
439            return packets;
440        };
441
442        packets
443            .into_iter()
444            .filter_map(|frame| {
445                let data = frame.serialize().ok()?;
446                let decrypted = processor.decrypt_frame(&data).ok()?;
447                let auth_tag = [0u8; AUTH_TAG_SIZE];
448                Some(Frame {
449                    header: decrypted.header,
450                    extensions: decrypted.extensions,
451                    payload: decrypted.payload,
452                    auth_tag,
453                })
454            })
455            .collect()
456    }
457
458    /// Stage 4: Extract events from validated packets
459    fn classify_events(&mut self, packets: Vec<Frame>) -> Vec<Event> {
460        let mut events = Vec::new();
461
462        for frame in packets {
463            let source = frame.header.node_id;
464            let time_hint = frame.header.time_hint;
465            let frame_events = self.decode_event_blocks(&frame.payload, source, time_hint);
466            for event in &frame_events {
467                self.handle_event_side_effects(event);
468            }
469            events.extend(frame_events);
470        }
471
472        events
473    }
474
475    fn handle_event_side_effects(&mut self, event: &Event) {
476        match event.event_type {
477            EventType::StreamStart => {
478                let stream_id = event.target_state.instance();
479                let started_at = event.absolute_time(self.time_engine.tau_s());
480
481                if let MutationOp::Set(data) = &event.mutation {
482                    self.stream_metadata.insert(
483                        stream_id,
484                        StreamMetadata {
485                            source: event.source,
486                            started_at,
487                            data: data.clone(),
488                        },
489                    );
490                }
491
492                let field = self.state_engine_mut().field_mut();
493                if field.get(event.target_state).is_none() {
494                    field.create_atom(
495                        event.target_state,
496                        elara_core::StateType::Core,
497                        event.source,
498                    );
499                }
500
501                let visual_state = stream_visual_state_id(stream_id);
502                if field.get(visual_state).is_none() {
503                    field.create_atom(
504                        visual_state,
505                        elara_core::StateType::Perceptual,
506                        event.source,
507                    );
508                }
509            }
510            EventType::StreamEnd => {
511                let stream_id = event.target_state.instance();
512                self.stream_metadata.remove(&stream_id);
513                self.stream_visual_buffers.remove(&stream_id);
514                self.stream_visual_predictors.remove(&stream_id);
515
516                let field = self.state_engine_mut().field_mut();
517                field.remove(livestream_state_id(stream_id));
518                field.remove(stream_visual_state_id(stream_id));
519            }
520            EventType::VisualKeyframe | EventType::VisualDelta => {
521                if let MutationOp::Set(data) = &event.mutation {
522                    if let Ok(state) = VisualEncoder::decode(data) {
523                        let stream_id = if self
524                            .stream_metadata
525                            .contains_key(&event.target_state.instance())
526                        {
527                            Some(event.target_state.instance())
528                        } else {
529                            None
530                        };
531                        self.update_visual_state(state, stream_id);
532                    }
533                }
534            }
535            _ => {}
536        }
537    }
538
539    /// Stage 5: Update time model from events
540    fn update_time_model(&mut self, events: &[Event]) {
541        let reference = self.time_engine.tau_s();
542        for event in events {
543            let remote_time = event.time_intent.to_absolute(reference);
544            let seq = (event.id.seq & 0xFFFF) as u16;
545            self.time_engine
546                .update_from_packet(event.source, remote_time, seq);
547        }
548    }
549
550    /// Stage 7: Generate predictions for missing state
551    fn generate_predictions(&mut self) {
552        let dt_us = self.config.tick_interval.as_micros() as u64;
553        {
554            let field = self.state_engine.field_mut();
555            for atom in field.atoms.values_mut() {
556                atom.entropy.time_since_actual =
557                    atom.entropy.time_since_actual.saturating_add(dt_us);
558            }
559        }
560
561        let needs_prediction = self.state_engine.field().atoms_needing_prediction(100);
562        if needs_prediction.is_empty() {
563            return;
564        }
565
566        let field = self.state_engine.field_mut();
567        for state_id in needs_prediction {
568            if let Some(atom) = field.atoms.get_mut(&state_id) {
569                let increase = match atom.state_type {
570                    elara_core::StateType::Core => 0.01,
571                    elara_core::StateType::Perceptual => 0.03,
572                    elara_core::StateType::Enhancement => 0.05,
573                    elara_core::StateType::Cosmetic => 0.07,
574                };
575                atom.entropy.increase(increase);
576            }
577        }
578    }
579
580    /// Stage 10: Authorize and sign local events
581    fn authorize_and_sign(&mut self) -> Vec<Event> {
582        let events: Vec<Event> = self.local_events.drain(..).collect();
583        self.stats.events_signed += events.len() as u64;
584
585        events
586            .into_iter()
587            .map(|mut event| {
588                // Sign event
589                let signature = self.identity.sign(&event.mutation.encode());
590                event.authority_proof.signature = signature;
591                event
592            })
593            .collect()
594    }
595
596    /// Stage 11: Build packets from authorized events
597    fn build_packets(&mut self, _events: Vec<Event>) {
598        let Some(processor) = self.secure_processor.as_mut() else {
599            self.build_plain_packets(_events);
600            return;
601        };
602
603        for event in _events {
604            if self.outgoing.len() >= self.config.max_outgoing_buffer {
605                break;
606            }
607
608            let class = Self::class_for_event(&event);
609            let profile = Self::profile_for_event(&event);
610            let time_hint = event.time_intent.ts_offset();
611            let payload = Self::encode_event_block(&event);
612
613            if let Ok(bytes) =
614                processor.encrypt_frame(class, profile, time_hint, Extensions::new(), &payload)
615            {
616                if let Ok(frame) = Frame::parse(&bytes) {
617                    self.outgoing.push_back(frame);
618                }
619            }
620        }
621    }
622
623    fn build_plain_packets(&mut self, events: Vec<Event>) {
624        for event in events {
625            if self.outgoing.len() >= self.config.max_outgoing_buffer {
626                break;
627            }
628
629            let class = Self::class_for_event(&event);
630            let profile = Self::profile_for_event(&event);
631            let time_hint = event.time_intent.ts_offset();
632            let payload = Self::encode_event_block(&event);
633
634            let session_id = self.session_id.unwrap_or(SessionId::ZERO);
635            let mut header = FixedHeader::new(session_id, self.node_id());
636            header.class = class;
637            header.profile = profile;
638            header.time_hint = time_hint;
639
640            let frame = FrameBuilder::new(header).payload(payload).build();
641            self.outgoing.push_back(frame);
642        }
643    }
644
645    fn decode_event_blocks(&self, payload: &[u8], source: NodeId, time_hint: i32) -> Vec<Event> {
646        let mut events = Vec::new();
647        let mut offset = 0;
648
649        while payload.len().saturating_sub(offset) >= 13 {
650            let event_type = match EventType::from_byte(payload[offset]) {
651                Some(t) => t,
652                None => break,
653            };
654            offset += 1;
655
656            let state_end = offset + 8;
657            if state_end > payload.len() {
658                break;
659            }
660            let state_id = match payload[offset..state_end].try_into() {
661                Ok(bytes) => StateId::from_bytes(bytes),
662                Err(_) => break,
663            };
664            offset = state_end;
665
666            let version_len_end = offset + 2;
667            if version_len_end > payload.len() {
668                break;
669            }
670            let version_len = match payload[offset..version_len_end].try_into() {
671                Ok(bytes) => u16::from_le_bytes(bytes) as usize,
672                Err(_) => break,
673            };
674            offset = version_len_end;
675
676            let version_end = offset + version_len;
677            if version_end > payload.len() {
678                break;
679            }
680            let version_ref = match Self::decode_version_vector(&payload[offset..version_end]) {
681                Some(v) => v,
682                None => break,
683            };
684            offset = version_end;
685
686            let delta_len_end = offset + 2;
687            if delta_len_end > payload.len() {
688                break;
689            }
690            let delta_len = match payload[offset..delta_len_end].try_into() {
691                Ok(bytes) => u16::from_le_bytes(bytes) as usize,
692                Err(_) => break,
693            };
694            offset = delta_len_end;
695
696            let delta_end = offset + delta_len;
697            if delta_end > payload.len() {
698                break;
699            }
700            let delta = &payload[offset..delta_end];
701            let (mutation, used) = match MutationOp::decode(delta) {
702                Some(decoded) => decoded,
703                None => break,
704            };
705            if used != delta_len {
706                break;
707            }
708            offset = delta_end;
709
710            let seq = version_ref.get(source).saturating_add(1);
711            let event = Event::new(source, seq, event_type, state_id, mutation)
712                .with_version(version_ref)
713                .with_time_intent(TimeIntent::new(time_hint));
714            events.push(event);
715        }
716
717        events
718    }
719
720    fn encode_event_block(event: &Event) -> Vec<u8> {
721        let mut buf = Vec::new();
722        buf.push(event.event_type.to_byte());
723        buf.extend_from_slice(&event.target_state.to_bytes());
724
725        let version = Self::encode_version_vector(&event.version_ref);
726        buf.extend_from_slice(&(version.len() as u16).to_le_bytes());
727        buf.extend_from_slice(&version);
728
729        let delta = event.mutation.encode();
730        buf.extend_from_slice(&(delta.len() as u16).to_le_bytes());
731        buf.extend_from_slice(&delta);
732
733        buf
734    }
735
736    fn encode_version_vector(version: &VersionVector) -> Vec<u8> {
737        let mut entries = version.to_compact();
738        entries.sort_by_key(|(node, _)| node.0);
739        let mut buf = Vec::with_capacity(entries.len() * 16);
740        for (node, count) in entries {
741            buf.extend_from_slice(&node.to_bytes());
742            buf.extend_from_slice(&count.to_le_bytes());
743        }
744        buf
745    }
746
747    fn decode_version_vector(buf: &[u8]) -> Option<VersionVector> {
748        if !buf.len().is_multiple_of(16) {
749            return None;
750        }
751        let mut entries = Vec::new();
752        for chunk in buf.chunks_exact(16) {
753            let node = match chunk[0..8].try_into() {
754                Ok(bytes) => NodeId::from_bytes(bytes),
755                Err(_) => return None,
756            };
757            let count = match chunk[8..16].try_into() {
758                Ok(bytes) => u64::from_le_bytes(bytes),
759                Err(_) => return None,
760            };
761            entries.push((node, count));
762        }
763        Some(VersionVector::from_compact(entries))
764    }
765
766    fn class_for_event(event: &Event) -> PacketClass {
767        match event.event_type {
768            EventType::StateRequest | EventType::StateResponse | EventType::GapFill => {
769                PacketClass::Repair
770            }
771            EventType::VoiceFrame | EventType::VoiceMute => PacketClass::Perceptual,
772            EventType::TypingStart | EventType::TypingStop | EventType::PresenceUpdate => {
773                PacketClass::Perceptual
774            }
775            EventType::VisualKeyframe | EventType::VisualDelta => PacketClass::Perceptual,
776            EventType::TextAppend
777            | EventType::TextEdit
778            | EventType::TextDelete
779            | EventType::TextReact => PacketClass::Core,
780            EventType::FeedAppend | EventType::FeedDelete => PacketClass::Core,
781            EventType::StreamStart | EventType::StreamEnd => PacketClass::Core,
782            _ => PacketClass::Core,
783        }
784    }
785
786    fn profile_for_event(event: &Event) -> RepresentationProfile {
787        match event.event_type {
788            EventType::VoiceFrame | EventType::VoiceMute => RepresentationProfile::VoiceMinimal,
789            EventType::VisualKeyframe | EventType::VisualDelta => {
790                RepresentationProfile::VideoStandard
791            }
792            EventType::StreamStart | EventType::StreamEnd => {
793                RepresentationProfile::StreamAsymmetric
794            }
795            _ => RepresentationProfile::Textual,
796        }
797    }
798
799    fn queue_visual_event(
800        &mut self,
801        target_state: StateId,
802        state: &VisualState,
803        event_type: EventType,
804    ) {
805        let seq = self.next_event_seq();
806        let time_intent = self.time_intent_for(state.timestamp);
807        let payload = VisualEncoder::encode(state);
808        let event = Event::new(
809            self.node_id(),
810            seq,
811            event_type,
812            target_state,
813            MutationOp::Set(payload),
814        )
815        .with_time_intent(time_intent);
816        self.queue_local_event(event);
817    }
818
819    fn time_intent_for(&self, timestamp: StateTime) -> TimeIntent {
820        let reference = self.time_engine.tau_s();
821        let offset = timestamp.to_wire_offset(reference);
822        TimeIntent::new(offset)
823    }
824
825    fn visual_buffer_config(&self) -> (usize, u32) {
826        let stability = self.time_engine.stability_score().clamp(0.1, 1.0);
827        let instability = 1.0 - stability;
828        let buffer_size = (24.0 + instability * 40.0).round() as usize;
829        let buffer_delay_ms = (40.0 + instability * 140.0).round() as u32;
830        (buffer_size.clamp(16, 64), buffer_delay_ms.clamp(30, 200))
831    }
832
833    fn visual_predictor_config(&self) -> PredictionConfig {
834        let stability = self.time_engine.stability_score().clamp(0.1, 1.0);
835        let instability = (1.0 - stability) as f32;
836        PredictionConfig {
837            max_horizon_ms: ((400.0 + instability as f64 * 800.0).round() as u32).clamp(300, 1200),
838            confidence_decay: (0.08 + instability * 0.12).clamp(0.06, 0.2),
839            min_confidence: (0.25 + instability * 0.2).clamp(0.2, 0.5),
840            ..PredictionConfig::default()
841        }
842    }
843
844    fn update_visual_state(&mut self, state: VisualState, stream_id: Option<u64>) {
845        let (buffer_size, buffer_delay_ms) = self.visual_buffer_config();
846        let predictor_config = self.visual_predictor_config();
847
848        let buffer = self
849            .visual_buffers
850            .entry(state.source)
851            .or_insert_with(|| VisualStateBuffer::new(buffer_size, buffer_delay_ms));
852        buffer.push(state.clone());
853        let predictor = self
854            .visual_predictors
855            .entry(state.source)
856            .or_insert_with(|| VisualPredictor::new(predictor_config.clone()));
857        predictor.update(state.clone());
858
859        if let Some(stream_id) = stream_id {
860            let stream_buffer = self
861                .stream_visual_buffers
862                .entry(stream_id)
863                .or_insert_with(|| VisualStateBuffer::new(buffer_size, buffer_delay_ms));
864            stream_buffer.push(state.clone());
865            let stream_predictor = self
866                .stream_visual_predictors
867                .entry(stream_id)
868                .or_insert_with(|| VisualPredictor::new(predictor_config));
869            stream_predictor.update(state);
870        }
871    }
872
873    pub fn visual_state(&self, node_id: NodeId) -> Option<VisualState> {
874        let atom = self.state_engine.field().get(visual_state_id(node_id))?;
875        VisualEncoder::decode(&atom.value).ok()
876    }
877
878    pub fn visual_state_at(
879        &mut self,
880        node_id: NodeId,
881        target_time: StateTime,
882    ) -> Option<VisualState> {
883        let state = self
884            .visual_buffers
885            .get(&node_id)
886            .and_then(|buffer| buffer.get_at(target_time));
887        if state.is_some() {
888            return state;
889        }
890        self.visual_predictors
891            .get_mut(&node_id)
892            .and_then(|predictor| predictor.predict(target_time))
893    }
894
895    pub fn visual_state_now(&mut self, node_id: NodeId) -> Option<VisualState> {
896        let now = self.time_engine.tau_s();
897        self.visual_state_at(node_id, now)
898    }
899
900    pub fn stream_visual_state(&self, stream_id: u64) -> Option<VisualState> {
901        let atom = self
902            .state_engine
903            .field()
904            .get(stream_visual_state_id(stream_id))?;
905        VisualEncoder::decode(&atom.value).ok()
906    }
907
908    pub fn stream_visual_state_at(
909        &mut self,
910        stream_id: u64,
911        target_time: StateTime,
912    ) -> Option<VisualState> {
913        let state = self
914            .stream_visual_buffers
915            .get(&stream_id)
916            .and_then(|buffer| buffer.get_at(target_time));
917        if state.is_some() {
918            return state;
919        }
920        self.stream_visual_predictors
921            .get_mut(&stream_id)
922            .and_then(|predictor| predictor.predict(target_time))
923    }
924
925    pub fn stream_visual_state_now(&mut self, stream_id: u64) -> Option<VisualState> {
926        let now = self.time_engine.tau_s();
927        self.stream_visual_state_at(stream_id, now)
928    }
929
930    pub fn feed_stream(&self, feed_state: StateId) -> FeedStream {
931        self.state_engine
932            .field()
933            .get(feed_state)
934            .map(|atom| FeedStream::from_bytes(&atom.value))
935            .unwrap_or_default()
936    }
937
938    pub fn stream_metadata(&self, stream_id: u64) -> Option<&StreamMetadata> {
939        self.stream_metadata.get(&stream_id)
940    }
941
942    /// Get reference to time engine
943    pub fn time_engine(&self) -> &TimeEngine {
944        &self.time_engine
945    }
946
947    /// Get reference to state engine
948    pub fn state_engine(&self) -> &ReconciliationEngine {
949        &self.state_engine
950    }
951
952    /// Get mutable reference to state engine
953    pub fn state_engine_mut(&mut self) -> &mut ReconciliationEngine {
954        &mut self.state_engine
955    }
956
957    pub fn stats(&self) -> &RuntimeStats {
958        &self.stats
959    }
960
961    /// Check if node is in a session
962    pub fn in_session(&self) -> bool {
963        self.session_id.is_some()
964    }
965
966    /// Get next event sequence number
967    pub fn next_event_seq(&mut self) -> u64 {
968        let seq = self.event_seq;
969        self.event_seq += 1;
970        seq
971    }
972}
973
974impl Default for Node {
975    fn default() -> Self {
976        Self::new()
977    }
978}
979
980#[cfg(test)]
981mod tests {
982    use super::*;
983    use elara_core::{PacketClass, RepresentationProfile};
984    use elara_msp::text::{feed_stream_id as feed_id, FeedItem as MspFeedItem};
985
986    #[test]
987    fn test_node_creation() {
988        let node = Node::new();
989        assert!(!node.in_session());
990    }
991
992    #[test]
993    fn test_node_tick() {
994        let mut node = Node::new();
995
996        // Should not panic
997        node.tick();
998        node.tick();
999        node.tick();
1000    }
1001
1002    #[test]
1003    fn test_local_event_buffer_limit() {
1004        let config = NodeConfig {
1005            max_local_events: 1,
1006            ..Default::default()
1007        };
1008        let mut node = Node::with_config(config);
1009
1010        let event_a = Event::new(
1011            NodeId::new(1),
1012            1,
1013            EventType::TextAppend,
1014            StateId::new(1),
1015            MutationOp::Append(b"a".to_vec()),
1016        );
1017        let event_b = Event::new(
1018            NodeId::new(1),
1019            2,
1020            EventType::TextAppend,
1021            StateId::new(1),
1022            MutationOp::Append(b"b".to_vec()),
1023        );
1024
1025        node.queue_local_event(event_a);
1026        node.queue_local_event(event_b);
1027
1028        assert_eq!(node.stats().local_events_queued, 1);
1029    }
1030
1031    #[test]
1032    fn test_prediction_entropy_advances() {
1033        let mut node = Node::new();
1034        let state_id = StateId::new(42);
1035        let node_id = node.node_id();
1036
1037        {
1038            let field = node.state_engine_mut().field_mut();
1039            let atom = field.create_atom(state_id, elara_core::StateType::Core, node_id);
1040            atom.entropy.time_since_actual = 0;
1041        }
1042
1043        node.tick();
1044
1045        let field = node.state_engine().field();
1046        let atom = field.get(state_id).unwrap();
1047        assert!(atom.entropy.time_since_actual > 0);
1048    }
1049
1050    #[test]
1051    fn test_node_session() {
1052        let mut node = Node::new();
1053        let session_id = SessionId::new(12345);
1054        let session_key = [0x42u8; 32];
1055
1056        node.join_session(session_id, session_key);
1057        assert!(node.in_session());
1058        assert_eq!(node.session_id(), Some(session_id));
1059
1060        node.leave_session();
1061        assert!(!node.in_session());
1062    }
1063
1064    fn build_payload(event_type: EventType, state_id: StateId, mutation: MutationOp) -> Vec<u8> {
1065        let mut buf = Vec::new();
1066        buf.push(event_type.to_byte());
1067        buf.extend_from_slice(&state_id.to_bytes());
1068
1069        // Empty version vector
1070        buf.extend_from_slice(&(0u16).to_le_bytes());
1071
1072        let delta = mutation.encode();
1073        buf.extend_from_slice(&(delta.len() as u16).to_le_bytes());
1074        buf.extend_from_slice(&delta);
1075
1076        buf
1077    }
1078
1079    fn incoming_frame_for(
1080        session: SessionId,
1081        source: NodeId,
1082        class: PacketClass,
1083        profile: RepresentationProfile,
1084        time_hint: i32,
1085        payload: Vec<u8>,
1086    ) -> Frame {
1087        let mut header = FixedHeader::new(session, source);
1088        header.class = class;
1089        header.profile = profile;
1090        header.time_hint = time_hint;
1091        FrameBuilder::new(header).payload(payload).build()
1092    }
1093
1094    #[test]
1095    fn test_stream_start_side_effects_on_incoming_frame() {
1096        let mut node = Node::new();
1097        let stream_id = 42u64;
1098        let target_state = livestream_state_id(stream_id);
1099
1100        let payload = build_payload(
1101            EventType::StreamStart,
1102            target_state,
1103            MutationOp::Set(vec![1, 2, 3]),
1104        );
1105
1106        let frame = incoming_frame_for(
1107            SessionId::new(1),
1108            NodeId::new(9000),
1109            PacketClass::Core,
1110            RepresentationProfile::StreamAsymmetric,
1111            0,
1112            payload,
1113        );
1114
1115        node.queue_incoming(frame);
1116        node.tick();
1117
1118        let field = node.state_engine().field();
1119        assert!(field.contains(livestream_state_id(stream_id)));
1120        assert!(field.contains(stream_visual_state_id(stream_id)));
1121        assert!(node.stream_metadata(stream_id).is_some());
1122    }
1123
1124    #[test]
1125    fn test_stream_end_removes_atoms() {
1126        let mut node = Node::new();
1127        let stream_id = 99u64;
1128
1129        // Start
1130        let start_payload = build_payload(
1131            EventType::StreamStart,
1132            livestream_state_id(stream_id),
1133            MutationOp::Set(vec![9, 9, 9]),
1134        );
1135        let start_frame = incoming_frame_for(
1136            SessionId::new(1),
1137            NodeId::new(7000),
1138            PacketClass::Core,
1139            RepresentationProfile::StreamAsymmetric,
1140            0,
1141            start_payload,
1142        );
1143        node.queue_incoming(start_frame);
1144        node.tick();
1145
1146        // End
1147        let end_payload = build_payload(
1148            EventType::StreamEnd,
1149            livestream_state_id(stream_id),
1150            MutationOp::Delete,
1151        );
1152        let end_frame = incoming_frame_for(
1153            SessionId::new(1),
1154            NodeId::new(7000),
1155            PacketClass::Core,
1156            RepresentationProfile::StreamAsymmetric,
1157            0,
1158            end_payload,
1159        );
1160        node.queue_incoming(end_frame);
1161        node.tick();
1162
1163        let field = node.state_engine().field();
1164        assert!(!field.contains(livestream_state_id(stream_id)));
1165        assert!(!field.contains(stream_visual_state_id(stream_id)));
1166        assert!(node.stream_metadata(stream_id).is_none());
1167    }
1168
1169    #[test]
1170    fn test_feed_append_roundtrip() {
1171        let mut node = Node::new();
1172        let feed_state = feed_id(7);
1173
1174        // Build a feed item and encode via MSP
1175        let item = MspFeedItem::new(
1176            elara_core::MessageId(1),
1177            NodeId::new(123),
1178            b"hello feed".to_vec(),
1179            elara_core::StateTime::from_millis(0),
1180        );
1181        let encoded_item = item.encode();
1182
1183        let payload = build_payload(
1184            EventType::FeedAppend,
1185            feed_state,
1186            MutationOp::Append(encoded_item),
1187        );
1188        let frame = incoming_frame_for(
1189            SessionId::new(1),
1190            NodeId::new(5000),
1191            PacketClass::Core,
1192            RepresentationProfile::Textual,
1193            0,
1194            payload,
1195        );
1196
1197        node.queue_incoming(frame);
1198        node.tick();
1199
1200        let stream = node.feed_stream(feed_state);
1201        assert_eq!(stream.items.len(), 1);
1202        let first = &stream.items[0];
1203        assert_eq!(first.id.0, 1);
1204        assert_eq!(first.author, NodeId::new(123));
1205        assert_eq!(first.content, b"hello feed".to_vec());
1206        assert!(!first.deleted);
1207    }
1208}