1use std::collections::{HashMap, VecDeque};
4use std::time::{Duration, Instant};
5
6use elara_core::{
7 Event, EventType, MessageId, MutationOp, NodeId, PacketClass, RepresentationProfile, SessionId,
8 StateId, StateTime, TimeIntent, VersionVector,
9};
10use elara_crypto::{Identity, SecureFrameProcessor};
11use elara_state::ReconciliationEngine;
12use elara_time::TimeEngine;
13use elara_visual::{
14 livestream_state_id, stream_visual_state_id, visual_state_id, PredictionConfig, VisualEncoder,
15 VisualPredictor, VisualState, VisualStateBuffer,
16};
17use elara_wire::{Extensions, FixedHeader, Frame, FrameBuilder, AUTH_TAG_SIZE};
18
19#[derive(Clone, Debug)]
21pub struct NodeConfig {
22 pub tick_interval: Duration,
24 pub max_packet_buffer: usize,
26 pub max_outgoing_buffer: usize,
28 pub max_local_events: usize,
29}
30
31#[derive(Clone, Debug, Default)]
32pub struct RuntimeStats {
33 pub ticks: u64,
34 pub incoming_queued: u64,
35 pub outgoing_popped: u64,
36 pub local_events_queued: u64,
37 pub events_signed: u64,
38 pub packets_in: u64,
39 pub packets_out: u64,
40 pub last_tick_duration: Duration,
41}
42
43#[derive(Clone, Debug)]
44pub struct FeedItem {
45 pub id: MessageId,
46 pub author: NodeId,
47 pub content: Vec<u8>,
48 pub timestamp: StateTime,
49 pub deleted: bool,
50}
51
52#[derive(Clone, Debug, Default)]
53pub struct FeedStream {
54 pub items: Vec<FeedItem>,
55}
56
57#[derive(Clone, Debug)]
58pub struct StreamMetadata {
59 pub source: NodeId,
60 pub started_at: StateTime,
61 pub data: Vec<u8>,
62}
63
64impl FeedItem {
65 fn decode(buf: &[u8]) -> Option<(Self, usize)> {
66 if buf.len() < 27 {
67 return None;
68 }
69
70 let id = MessageId(u64::from_le_bytes(buf[0..8].try_into().ok()?));
71 let author = NodeId::from_bytes(buf[8..16].try_into().ok()?);
72 let timestamp = StateTime::from_micros(i64::from_le_bytes(buf[16..24].try_into().ok()?));
73 let deleted = buf[24] != 0;
74 let mut offset = 25;
75
76 if buf.len() < offset + 2 {
77 return None;
78 }
79 let content_len = u16::from_le_bytes(buf[offset..offset + 2].try_into().ok()?) as usize;
80 offset += 2;
81
82 if buf.len() < offset + content_len {
83 return None;
84 }
85 let content = buf[offset..offset + content_len].to_vec();
86 offset += content_len;
87
88 Some((
89 FeedItem {
90 id,
91 author,
92 content,
93 timestamp,
94 deleted,
95 },
96 offset,
97 ))
98 }
99}
100
101impl FeedStream {
102 fn from_bytes(data: &[u8]) -> Self {
103 let mut stream = FeedStream::default();
104 let mut offset = 0;
105 while offset < data.len() {
106 let Some((item, used)) = FeedItem::decode(&data[offset..]) else {
107 break;
108 };
109 stream.apply_item(item);
110 offset += used;
111 }
112 stream
113 }
114
115 fn apply_item(&mut self, item: FeedItem) {
116 if let Some(existing) = self.items.iter_mut().find(|m| m.id == item.id) {
117 existing.deleted = item.deleted;
118 existing.content = item.content;
119 existing.timestamp = item.timestamp;
120 existing.author = item.author;
121 return;
122 }
123
124 let pos = self
125 .items
126 .binary_search_by(|m| m.timestamp.cmp(&item.timestamp))
127 .unwrap_or_else(|p| p);
128 self.items.insert(pos, item);
129 }
130}
131
132impl Default for NodeConfig {
133 fn default() -> Self {
134 NodeConfig {
135 tick_interval: Duration::from_millis(10),
136 max_packet_buffer: 1000,
137 max_outgoing_buffer: 1000,
138 max_local_events: 1000,
139 }
140 }
141}
142
143pub struct Node {
145 identity: Identity,
147 session_id: Option<SessionId>,
149 time_engine: TimeEngine,
151 state_engine: ReconciliationEngine,
153 secure_processor: Option<SecureFrameProcessor>,
154 incoming: VecDeque<Frame>,
156 outgoing: VecDeque<Frame>,
158 local_events: Vec<Event>,
160 event_seq: u64,
162 config: NodeConfig,
164 stats: RuntimeStats,
165 stream_metadata: HashMap<u64, StreamMetadata>,
166 visual_buffers: HashMap<NodeId, VisualStateBuffer>,
167 visual_predictors: HashMap<NodeId, VisualPredictor>,
168 stream_visual_buffers: HashMap<u64, VisualStateBuffer>,
169 stream_visual_predictors: HashMap<u64, VisualPredictor>,
170}
171
172impl Node {
173 pub fn new() -> Self {
175 Self::with_config(NodeConfig::default())
176 }
177
178 pub fn with_config(config: NodeConfig) -> Self {
180 Node {
181 identity: Identity::generate(),
182 session_id: None,
183 time_engine: TimeEngine::new(),
184 state_engine: ReconciliationEngine::new(),
185 secure_processor: None,
186 incoming: VecDeque::new(),
187 outgoing: VecDeque::new(),
188 local_events: Vec::new(),
189 event_seq: 0,
190 config,
191 stats: RuntimeStats::default(),
192 stream_metadata: HashMap::new(),
193 visual_buffers: HashMap::new(),
194 visual_predictors: HashMap::new(),
195 stream_visual_buffers: HashMap::new(),
196 stream_visual_predictors: HashMap::new(),
197 }
198 }
199
200 pub fn with_identity(identity: Identity, config: NodeConfig) -> Self {
201 Node {
202 identity,
203 session_id: None,
204 time_engine: TimeEngine::new(),
205 state_engine: ReconciliationEngine::new(),
206 secure_processor: None,
207 incoming: VecDeque::new(),
208 outgoing: VecDeque::new(),
209 local_events: Vec::new(),
210 event_seq: 0,
211 config,
212 stats: RuntimeStats::default(),
213 stream_metadata: HashMap::new(),
214 visual_buffers: HashMap::new(),
215 visual_predictors: HashMap::new(),
216 stream_visual_buffers: HashMap::new(),
217 stream_visual_predictors: HashMap::new(),
218 }
219 }
220
221 pub fn node_id(&self) -> NodeId {
223 self.identity.node_id()
224 }
225
226 pub fn session_id(&self) -> Option<SessionId> {
228 self.session_id
229 }
230
231 pub fn join_session(&mut self, session_id: SessionId, session_key: [u8; 32]) {
233 self.session_id = Some(session_id);
234 self.secure_processor = Some(SecureFrameProcessor::new(
235 session_id,
236 self.node_id(),
237 session_key,
238 ));
239 }
240
241 pub fn join_session_unsecured(&mut self, session_id: SessionId) {
242 self.session_id = Some(session_id);
243 self.secure_processor = None;
244 }
245
246 pub fn leave_session(&mut self) {
248 self.session_id = None;
249 self.secure_processor = None;
250 }
251
252 pub fn queue_incoming(&mut self, frame: Frame) {
254 if self.incoming.len() < self.config.max_packet_buffer {
255 self.incoming.push_back(frame);
256 self.stats.incoming_queued += 1;
257 }
258 }
259
260 pub fn pop_outgoing(&mut self) -> Option<Frame> {
262 let frame = self.outgoing.pop_front();
263 if frame.is_some() {
264 self.stats.outgoing_popped += 1;
265 self.stats.packets_out += 1;
266 }
267 frame
268 }
269
270 pub fn queue_local_event(&mut self, event: Event) {
272 if self.local_events.len() < self.config.max_local_events {
273 self.local_events.push(event);
274 self.stats.local_events_queued += 1;
275 }
276 }
277
278 pub fn queue_visual_keyframe(&mut self, state: &VisualState) {
279 self.queue_visual_event(
280 visual_state_id(state.source),
281 state,
282 EventType::VisualKeyframe,
283 );
284 }
285
286 pub fn queue_visual_delta(&mut self, state: &VisualState) {
287 self.queue_visual_event(visual_state_id(state.source), state, EventType::VisualDelta);
288 }
289
290 pub fn queue_stream_visual_keyframe(&mut self, stream_id: u64, state: &VisualState) {
291 self.queue_visual_event(
292 stream_visual_state_id(stream_id),
293 state,
294 EventType::VisualKeyframe,
295 );
296 }
297
298 pub fn queue_stream_visual_delta(&mut self, stream_id: u64, state: &VisualState) {
299 self.queue_visual_event(
300 stream_visual_state_id(stream_id),
301 state,
302 EventType::VisualDelta,
303 );
304 }
305
306 pub fn queue_stream_start(&mut self, stream_id: u64, metadata: Vec<u8>, timestamp: StateTime) {
307 self.stream_metadata.insert(
308 stream_id,
309 StreamMetadata {
310 source: self.node_id(),
311 started_at: timestamp,
312 data: metadata.clone(),
313 },
314 );
315 let target_state = livestream_state_id(stream_id);
316 let seq = self.next_event_seq();
317 let time_intent = self.time_intent_for(timestamp);
318 let event = Event::new(
319 self.node_id(),
320 seq,
321 EventType::StreamStart,
322 target_state,
323 MutationOp::Set(metadata),
324 )
325 .with_time_intent(time_intent);
326 self.queue_local_event(event);
327 }
328
329 pub fn queue_stream_end(&mut self, stream_id: u64, timestamp: StateTime) {
330 self.stream_metadata.remove(&stream_id);
331 self.stream_visual_buffers.remove(&stream_id);
332 self.stream_visual_predictors.remove(&stream_id);
333 let target_state = livestream_state_id(stream_id);
334 let seq = self.next_event_seq();
335 let time_intent = self.time_intent_for(timestamp);
336 let event = Event::new(
337 self.node_id(),
338 seq,
339 EventType::StreamEnd,
340 target_state,
341 MutationOp::Delete,
342 )
343 .with_time_intent(time_intent);
344 self.queue_local_event(event);
345
346 let visual_state = stream_visual_state_id(stream_id);
347 let seq = self.next_event_seq();
348 let event = Event::new(
349 self.node_id(),
350 seq,
351 EventType::StreamEnd,
352 visual_state,
353 MutationOp::Delete,
354 )
355 .with_time_intent(time_intent);
356 self.queue_local_event(event);
357 }
358
359 pub fn queue_feed_append(&mut self, feed_state: StateId, data: Vec<u8>, timestamp: StateTime) {
360 let seq = self.next_event_seq();
361 let time_intent = self.time_intent_for(timestamp);
362 let event = Event::new(
363 self.node_id(),
364 seq,
365 EventType::FeedAppend,
366 feed_state,
367 MutationOp::Append(data),
368 )
369 .with_time_intent(time_intent);
370 self.queue_local_event(event);
371 }
372
373 pub fn queue_feed_delete(&mut self, feed_state: StateId, timestamp: StateTime) {
374 let seq = self.next_event_seq();
375 let time_intent = self.time_intent_for(timestamp);
376 let event = Event::new(
377 self.node_id(),
378 seq,
379 EventType::FeedDelete,
380 feed_state,
381 MutationOp::Delete,
382 )
383 .with_time_intent(time_intent);
384 self.queue_local_event(event);
385 }
386
387 pub fn tick(&mut self) {
390 let start = Instant::now();
391 self.stats.ticks += 1;
392
393 self.time_engine.tick();
395
396 let packets = self.ingest_packets();
398
399 let validated = self.decrypt_and_validate(packets);
401
402 let events = self.classify_events(validated);
404
405 self.update_time_model(&events);
407
408 let _result = self.state_engine.process_events(events, &self.time_engine);
410 self.state_engine.control_divergence();
411
412 self.generate_predictions();
414
415 let authorized = self.authorize_and_sign();
420
421 self.build_packets(authorized);
423
424 self.stats.last_tick_duration = start.elapsed();
427 }
428
429 fn ingest_packets(&mut self) -> Vec<Frame> {
431 let packets: Vec<Frame> = self.incoming.drain(..).collect();
432 self.stats.packets_in += packets.len() as u64;
433 packets
434 }
435
436 fn decrypt_and_validate(&mut self, packets: Vec<Frame>) -> Vec<Frame> {
438 let Some(processor) = self.secure_processor.as_mut() else {
439 return packets;
440 };
441
442 packets
443 .into_iter()
444 .filter_map(|frame| {
445 let data = frame.serialize().ok()?;
446 let decrypted = processor.decrypt_frame(&data).ok()?;
447 let auth_tag = [0u8; AUTH_TAG_SIZE];
448 Some(Frame {
449 header: decrypted.header,
450 extensions: decrypted.extensions,
451 payload: decrypted.payload,
452 auth_tag,
453 })
454 })
455 .collect()
456 }
457
458 fn classify_events(&mut self, packets: Vec<Frame>) -> Vec<Event> {
460 let mut events = Vec::new();
461
462 for frame in packets {
463 let source = frame.header.node_id;
464 let time_hint = frame.header.time_hint;
465 let frame_events = self.decode_event_blocks(&frame.payload, source, time_hint);
466 for event in &frame_events {
467 self.handle_event_side_effects(event);
468 }
469 events.extend(frame_events);
470 }
471
472 events
473 }
474
475 fn handle_event_side_effects(&mut self, event: &Event) {
476 match event.event_type {
477 EventType::StreamStart => {
478 let stream_id = event.target_state.instance();
479 let started_at = event.absolute_time(self.time_engine.tau_s());
480
481 if let MutationOp::Set(data) = &event.mutation {
482 self.stream_metadata.insert(
483 stream_id,
484 StreamMetadata {
485 source: event.source,
486 started_at,
487 data: data.clone(),
488 },
489 );
490 }
491
492 let field = self.state_engine_mut().field_mut();
493 if field.get(event.target_state).is_none() {
494 field.create_atom(
495 event.target_state,
496 elara_core::StateType::Core,
497 event.source,
498 );
499 }
500
501 let visual_state = stream_visual_state_id(stream_id);
502 if field.get(visual_state).is_none() {
503 field.create_atom(
504 visual_state,
505 elara_core::StateType::Perceptual,
506 event.source,
507 );
508 }
509 }
510 EventType::StreamEnd => {
511 let stream_id = event.target_state.instance();
512 self.stream_metadata.remove(&stream_id);
513 self.stream_visual_buffers.remove(&stream_id);
514 self.stream_visual_predictors.remove(&stream_id);
515
516 let field = self.state_engine_mut().field_mut();
517 field.remove(livestream_state_id(stream_id));
518 field.remove(stream_visual_state_id(stream_id));
519 }
520 EventType::VisualKeyframe | EventType::VisualDelta => {
521 if let MutationOp::Set(data) = &event.mutation {
522 if let Ok(state) = VisualEncoder::decode(data) {
523 let stream_id = if self
524 .stream_metadata
525 .contains_key(&event.target_state.instance())
526 {
527 Some(event.target_state.instance())
528 } else {
529 None
530 };
531 self.update_visual_state(state, stream_id);
532 }
533 }
534 }
535 _ => {}
536 }
537 }
538
539 fn update_time_model(&mut self, events: &[Event]) {
541 let reference = self.time_engine.tau_s();
542 for event in events {
543 let remote_time = event.time_intent.to_absolute(reference);
544 let seq = (event.id.seq & 0xFFFF) as u16;
545 self.time_engine
546 .update_from_packet(event.source, remote_time, seq);
547 }
548 }
549
550 fn generate_predictions(&mut self) {
552 let dt_us = self.config.tick_interval.as_micros() as u64;
553 {
554 let field = self.state_engine.field_mut();
555 for atom in field.atoms.values_mut() {
556 atom.entropy.time_since_actual =
557 atom.entropy.time_since_actual.saturating_add(dt_us);
558 }
559 }
560
561 let needs_prediction = self.state_engine.field().atoms_needing_prediction(100);
562 if needs_prediction.is_empty() {
563 return;
564 }
565
566 let field = self.state_engine.field_mut();
567 for state_id in needs_prediction {
568 if let Some(atom) = field.atoms.get_mut(&state_id) {
569 let increase = match atom.state_type {
570 elara_core::StateType::Core => 0.01,
571 elara_core::StateType::Perceptual => 0.03,
572 elara_core::StateType::Enhancement => 0.05,
573 elara_core::StateType::Cosmetic => 0.07,
574 };
575 atom.entropy.increase(increase);
576 }
577 }
578 }
579
580 fn authorize_and_sign(&mut self) -> Vec<Event> {
582 let events: Vec<Event> = self.local_events.drain(..).collect();
583 self.stats.events_signed += events.len() as u64;
584
585 events
586 .into_iter()
587 .map(|mut event| {
588 let signature = self.identity.sign(&event.mutation.encode());
590 event.authority_proof.signature = signature;
591 event
592 })
593 .collect()
594 }
595
596 fn build_packets(&mut self, _events: Vec<Event>) {
598 let Some(processor) = self.secure_processor.as_mut() else {
599 self.build_plain_packets(_events);
600 return;
601 };
602
603 for event in _events {
604 if self.outgoing.len() >= self.config.max_outgoing_buffer {
605 break;
606 }
607
608 let class = Self::class_for_event(&event);
609 let profile = Self::profile_for_event(&event);
610 let time_hint = event.time_intent.ts_offset();
611 let payload = Self::encode_event_block(&event);
612
613 if let Ok(bytes) =
614 processor.encrypt_frame(class, profile, time_hint, Extensions::new(), &payload)
615 {
616 if let Ok(frame) = Frame::parse(&bytes) {
617 self.outgoing.push_back(frame);
618 }
619 }
620 }
621 }
622
623 fn build_plain_packets(&mut self, events: Vec<Event>) {
624 for event in events {
625 if self.outgoing.len() >= self.config.max_outgoing_buffer {
626 break;
627 }
628
629 let class = Self::class_for_event(&event);
630 let profile = Self::profile_for_event(&event);
631 let time_hint = event.time_intent.ts_offset();
632 let payload = Self::encode_event_block(&event);
633
634 let session_id = self.session_id.unwrap_or(SessionId::ZERO);
635 let mut header = FixedHeader::new(session_id, self.node_id());
636 header.class = class;
637 header.profile = profile;
638 header.time_hint = time_hint;
639
640 let frame = FrameBuilder::new(header).payload(payload).build();
641 self.outgoing.push_back(frame);
642 }
643 }
644
645 fn decode_event_blocks(&self, payload: &[u8], source: NodeId, time_hint: i32) -> Vec<Event> {
646 let mut events = Vec::new();
647 let mut offset = 0;
648
649 while payload.len().saturating_sub(offset) >= 13 {
650 let event_type = match EventType::from_byte(payload[offset]) {
651 Some(t) => t,
652 None => break,
653 };
654 offset += 1;
655
656 let state_end = offset + 8;
657 if state_end > payload.len() {
658 break;
659 }
660 let state_id = match payload[offset..state_end].try_into() {
661 Ok(bytes) => StateId::from_bytes(bytes),
662 Err(_) => break,
663 };
664 offset = state_end;
665
666 let version_len_end = offset + 2;
667 if version_len_end > payload.len() {
668 break;
669 }
670 let version_len = match payload[offset..version_len_end].try_into() {
671 Ok(bytes) => u16::from_le_bytes(bytes) as usize,
672 Err(_) => break,
673 };
674 offset = version_len_end;
675
676 let version_end = offset + version_len;
677 if version_end > payload.len() {
678 break;
679 }
680 let version_ref = match Self::decode_version_vector(&payload[offset..version_end]) {
681 Some(v) => v,
682 None => break,
683 };
684 offset = version_end;
685
686 let delta_len_end = offset + 2;
687 if delta_len_end > payload.len() {
688 break;
689 }
690 let delta_len = match payload[offset..delta_len_end].try_into() {
691 Ok(bytes) => u16::from_le_bytes(bytes) as usize,
692 Err(_) => break,
693 };
694 offset = delta_len_end;
695
696 let delta_end = offset + delta_len;
697 if delta_end > payload.len() {
698 break;
699 }
700 let delta = &payload[offset..delta_end];
701 let (mutation, used) = match MutationOp::decode(delta) {
702 Some(decoded) => decoded,
703 None => break,
704 };
705 if used != delta_len {
706 break;
707 }
708 offset = delta_end;
709
710 let seq = version_ref.get(source).saturating_add(1);
711 let event = Event::new(source, seq, event_type, state_id, mutation)
712 .with_version(version_ref)
713 .with_time_intent(TimeIntent::new(time_hint));
714 events.push(event);
715 }
716
717 events
718 }
719
720 fn encode_event_block(event: &Event) -> Vec<u8> {
721 let mut buf = Vec::new();
722 buf.push(event.event_type.to_byte());
723 buf.extend_from_slice(&event.target_state.to_bytes());
724
725 let version = Self::encode_version_vector(&event.version_ref);
726 buf.extend_from_slice(&(version.len() as u16).to_le_bytes());
727 buf.extend_from_slice(&version);
728
729 let delta = event.mutation.encode();
730 buf.extend_from_slice(&(delta.len() as u16).to_le_bytes());
731 buf.extend_from_slice(&delta);
732
733 buf
734 }
735
736 fn encode_version_vector(version: &VersionVector) -> Vec<u8> {
737 let mut entries = version.to_compact();
738 entries.sort_by_key(|(node, _)| node.0);
739 let mut buf = Vec::with_capacity(entries.len() * 16);
740 for (node, count) in entries {
741 buf.extend_from_slice(&node.to_bytes());
742 buf.extend_from_slice(&count.to_le_bytes());
743 }
744 buf
745 }
746
747 fn decode_version_vector(buf: &[u8]) -> Option<VersionVector> {
748 if !buf.len().is_multiple_of(16) {
749 return None;
750 }
751 let mut entries = Vec::new();
752 for chunk in buf.chunks_exact(16) {
753 let node = match chunk[0..8].try_into() {
754 Ok(bytes) => NodeId::from_bytes(bytes),
755 Err(_) => return None,
756 };
757 let count = match chunk[8..16].try_into() {
758 Ok(bytes) => u64::from_le_bytes(bytes),
759 Err(_) => return None,
760 };
761 entries.push((node, count));
762 }
763 Some(VersionVector::from_compact(entries))
764 }
765
766 fn class_for_event(event: &Event) -> PacketClass {
767 match event.event_type {
768 EventType::StateRequest | EventType::StateResponse | EventType::GapFill => {
769 PacketClass::Repair
770 }
771 EventType::VoiceFrame | EventType::VoiceMute => PacketClass::Perceptual,
772 EventType::TypingStart | EventType::TypingStop | EventType::PresenceUpdate => {
773 PacketClass::Perceptual
774 }
775 EventType::VisualKeyframe | EventType::VisualDelta => PacketClass::Perceptual,
776 EventType::TextAppend
777 | EventType::TextEdit
778 | EventType::TextDelete
779 | EventType::TextReact => PacketClass::Core,
780 EventType::FeedAppend | EventType::FeedDelete => PacketClass::Core,
781 EventType::StreamStart | EventType::StreamEnd => PacketClass::Core,
782 _ => PacketClass::Core,
783 }
784 }
785
786 fn profile_for_event(event: &Event) -> RepresentationProfile {
787 match event.event_type {
788 EventType::VoiceFrame | EventType::VoiceMute => RepresentationProfile::VoiceMinimal,
789 EventType::VisualKeyframe | EventType::VisualDelta => {
790 RepresentationProfile::VideoStandard
791 }
792 EventType::StreamStart | EventType::StreamEnd => {
793 RepresentationProfile::StreamAsymmetric
794 }
795 _ => RepresentationProfile::Textual,
796 }
797 }
798
799 fn queue_visual_event(
800 &mut self,
801 target_state: StateId,
802 state: &VisualState,
803 event_type: EventType,
804 ) {
805 let seq = self.next_event_seq();
806 let time_intent = self.time_intent_for(state.timestamp);
807 let payload = VisualEncoder::encode(state);
808 let event = Event::new(
809 self.node_id(),
810 seq,
811 event_type,
812 target_state,
813 MutationOp::Set(payload),
814 )
815 .with_time_intent(time_intent);
816 self.queue_local_event(event);
817 }
818
819 fn time_intent_for(&self, timestamp: StateTime) -> TimeIntent {
820 let reference = self.time_engine.tau_s();
821 let offset = timestamp.to_wire_offset(reference);
822 TimeIntent::new(offset)
823 }
824
825 fn visual_buffer_config(&self) -> (usize, u32) {
826 let stability = self.time_engine.stability_score().clamp(0.1, 1.0);
827 let instability = 1.0 - stability;
828 let buffer_size = (24.0 + instability * 40.0).round() as usize;
829 let buffer_delay_ms = (40.0 + instability * 140.0).round() as u32;
830 (buffer_size.clamp(16, 64), buffer_delay_ms.clamp(30, 200))
831 }
832
833 fn visual_predictor_config(&self) -> PredictionConfig {
834 let stability = self.time_engine.stability_score().clamp(0.1, 1.0);
835 let instability = (1.0 - stability) as f32;
836 PredictionConfig {
837 max_horizon_ms: ((400.0 + instability as f64 * 800.0).round() as u32).clamp(300, 1200),
838 confidence_decay: (0.08 + instability * 0.12).clamp(0.06, 0.2),
839 min_confidence: (0.25 + instability * 0.2).clamp(0.2, 0.5),
840 ..PredictionConfig::default()
841 }
842 }
843
844 fn update_visual_state(&mut self, state: VisualState, stream_id: Option<u64>) {
845 let (buffer_size, buffer_delay_ms) = self.visual_buffer_config();
846 let predictor_config = self.visual_predictor_config();
847
848 let buffer = self
849 .visual_buffers
850 .entry(state.source)
851 .or_insert_with(|| VisualStateBuffer::new(buffer_size, buffer_delay_ms));
852 buffer.push(state.clone());
853 let predictor = self
854 .visual_predictors
855 .entry(state.source)
856 .or_insert_with(|| VisualPredictor::new(predictor_config.clone()));
857 predictor.update(state.clone());
858
859 if let Some(stream_id) = stream_id {
860 let stream_buffer = self
861 .stream_visual_buffers
862 .entry(stream_id)
863 .or_insert_with(|| VisualStateBuffer::new(buffer_size, buffer_delay_ms));
864 stream_buffer.push(state.clone());
865 let stream_predictor = self
866 .stream_visual_predictors
867 .entry(stream_id)
868 .or_insert_with(|| VisualPredictor::new(predictor_config));
869 stream_predictor.update(state);
870 }
871 }
872
873 pub fn visual_state(&self, node_id: NodeId) -> Option<VisualState> {
874 let atom = self.state_engine.field().get(visual_state_id(node_id))?;
875 VisualEncoder::decode(&atom.value).ok()
876 }
877
878 pub fn visual_state_at(
879 &mut self,
880 node_id: NodeId,
881 target_time: StateTime,
882 ) -> Option<VisualState> {
883 let state = self
884 .visual_buffers
885 .get(&node_id)
886 .and_then(|buffer| buffer.get_at(target_time));
887 if state.is_some() {
888 return state;
889 }
890 self.visual_predictors
891 .get_mut(&node_id)
892 .and_then(|predictor| predictor.predict(target_time))
893 }
894
895 pub fn visual_state_now(&mut self, node_id: NodeId) -> Option<VisualState> {
896 let now = self.time_engine.tau_s();
897 self.visual_state_at(node_id, now)
898 }
899
900 pub fn stream_visual_state(&self, stream_id: u64) -> Option<VisualState> {
901 let atom = self
902 .state_engine
903 .field()
904 .get(stream_visual_state_id(stream_id))?;
905 VisualEncoder::decode(&atom.value).ok()
906 }
907
908 pub fn stream_visual_state_at(
909 &mut self,
910 stream_id: u64,
911 target_time: StateTime,
912 ) -> Option<VisualState> {
913 let state = self
914 .stream_visual_buffers
915 .get(&stream_id)
916 .and_then(|buffer| buffer.get_at(target_time));
917 if state.is_some() {
918 return state;
919 }
920 self.stream_visual_predictors
921 .get_mut(&stream_id)
922 .and_then(|predictor| predictor.predict(target_time))
923 }
924
925 pub fn stream_visual_state_now(&mut self, stream_id: u64) -> Option<VisualState> {
926 let now = self.time_engine.tau_s();
927 self.stream_visual_state_at(stream_id, now)
928 }
929
930 pub fn feed_stream(&self, feed_state: StateId) -> FeedStream {
931 self.state_engine
932 .field()
933 .get(feed_state)
934 .map(|atom| FeedStream::from_bytes(&atom.value))
935 .unwrap_or_default()
936 }
937
938 pub fn stream_metadata(&self, stream_id: u64) -> Option<&StreamMetadata> {
939 self.stream_metadata.get(&stream_id)
940 }
941
942 pub fn time_engine(&self) -> &TimeEngine {
944 &self.time_engine
945 }
946
947 pub fn state_engine(&self) -> &ReconciliationEngine {
949 &self.state_engine
950 }
951
952 pub fn state_engine_mut(&mut self) -> &mut ReconciliationEngine {
954 &mut self.state_engine
955 }
956
957 pub fn stats(&self) -> &RuntimeStats {
958 &self.stats
959 }
960
961 pub fn in_session(&self) -> bool {
963 self.session_id.is_some()
964 }
965
966 pub fn next_event_seq(&mut self) -> u64 {
968 let seq = self.event_seq;
969 self.event_seq += 1;
970 seq
971 }
972}
973
974impl Default for Node {
975 fn default() -> Self {
976 Self::new()
977 }
978}
979
980#[cfg(test)]
981mod tests {
982 use super::*;
983 use elara_core::{PacketClass, RepresentationProfile};
984 use elara_msp::text::{feed_stream_id as feed_id, FeedItem as MspFeedItem};
985
986 #[test]
987 fn test_node_creation() {
988 let node = Node::new();
989 assert!(!node.in_session());
990 }
991
992 #[test]
993 fn test_node_tick() {
994 let mut node = Node::new();
995
996 node.tick();
998 node.tick();
999 node.tick();
1000 }
1001
1002 #[test]
1003 fn test_local_event_buffer_limit() {
1004 let config = NodeConfig {
1005 max_local_events: 1,
1006 ..Default::default()
1007 };
1008 let mut node = Node::with_config(config);
1009
1010 let event_a = Event::new(
1011 NodeId::new(1),
1012 1,
1013 EventType::TextAppend,
1014 StateId::new(1),
1015 MutationOp::Append(b"a".to_vec()),
1016 );
1017 let event_b = Event::new(
1018 NodeId::new(1),
1019 2,
1020 EventType::TextAppend,
1021 StateId::new(1),
1022 MutationOp::Append(b"b".to_vec()),
1023 );
1024
1025 node.queue_local_event(event_a);
1026 node.queue_local_event(event_b);
1027
1028 assert_eq!(node.stats().local_events_queued, 1);
1029 }
1030
1031 #[test]
1032 fn test_prediction_entropy_advances() {
1033 let mut node = Node::new();
1034 let state_id = StateId::new(42);
1035 let node_id = node.node_id();
1036
1037 {
1038 let field = node.state_engine_mut().field_mut();
1039 let atom = field.create_atom(state_id, elara_core::StateType::Core, node_id);
1040 atom.entropy.time_since_actual = 0;
1041 }
1042
1043 node.tick();
1044
1045 let field = node.state_engine().field();
1046 let atom = field.get(state_id).unwrap();
1047 assert!(atom.entropy.time_since_actual > 0);
1048 }
1049
1050 #[test]
1051 fn test_node_session() {
1052 let mut node = Node::new();
1053 let session_id = SessionId::new(12345);
1054 let session_key = [0x42u8; 32];
1055
1056 node.join_session(session_id, session_key);
1057 assert!(node.in_session());
1058 assert_eq!(node.session_id(), Some(session_id));
1059
1060 node.leave_session();
1061 assert!(!node.in_session());
1062 }
1063
1064 fn build_payload(event_type: EventType, state_id: StateId, mutation: MutationOp) -> Vec<u8> {
1065 let mut buf = Vec::new();
1066 buf.push(event_type.to_byte());
1067 buf.extend_from_slice(&state_id.to_bytes());
1068
1069 buf.extend_from_slice(&(0u16).to_le_bytes());
1071
1072 let delta = mutation.encode();
1073 buf.extend_from_slice(&(delta.len() as u16).to_le_bytes());
1074 buf.extend_from_slice(&delta);
1075
1076 buf
1077 }
1078
1079 fn incoming_frame_for(
1080 session: SessionId,
1081 source: NodeId,
1082 class: PacketClass,
1083 profile: RepresentationProfile,
1084 time_hint: i32,
1085 payload: Vec<u8>,
1086 ) -> Frame {
1087 let mut header = FixedHeader::new(session, source);
1088 header.class = class;
1089 header.profile = profile;
1090 header.time_hint = time_hint;
1091 FrameBuilder::new(header).payload(payload).build()
1092 }
1093
1094 #[test]
1095 fn test_stream_start_side_effects_on_incoming_frame() {
1096 let mut node = Node::new();
1097 let stream_id = 42u64;
1098 let target_state = livestream_state_id(stream_id);
1099
1100 let payload = build_payload(
1101 EventType::StreamStart,
1102 target_state,
1103 MutationOp::Set(vec![1, 2, 3]),
1104 );
1105
1106 let frame = incoming_frame_for(
1107 SessionId::new(1),
1108 NodeId::new(9000),
1109 PacketClass::Core,
1110 RepresentationProfile::StreamAsymmetric,
1111 0,
1112 payload,
1113 );
1114
1115 node.queue_incoming(frame);
1116 node.tick();
1117
1118 let field = node.state_engine().field();
1119 assert!(field.contains(livestream_state_id(stream_id)));
1120 assert!(field.contains(stream_visual_state_id(stream_id)));
1121 assert!(node.stream_metadata(stream_id).is_some());
1122 }
1123
1124 #[test]
1125 fn test_stream_end_removes_atoms() {
1126 let mut node = Node::new();
1127 let stream_id = 99u64;
1128
1129 let start_payload = build_payload(
1131 EventType::StreamStart,
1132 livestream_state_id(stream_id),
1133 MutationOp::Set(vec![9, 9, 9]),
1134 );
1135 let start_frame = incoming_frame_for(
1136 SessionId::new(1),
1137 NodeId::new(7000),
1138 PacketClass::Core,
1139 RepresentationProfile::StreamAsymmetric,
1140 0,
1141 start_payload,
1142 );
1143 node.queue_incoming(start_frame);
1144 node.tick();
1145
1146 let end_payload = build_payload(
1148 EventType::StreamEnd,
1149 livestream_state_id(stream_id),
1150 MutationOp::Delete,
1151 );
1152 let end_frame = incoming_frame_for(
1153 SessionId::new(1),
1154 NodeId::new(7000),
1155 PacketClass::Core,
1156 RepresentationProfile::StreamAsymmetric,
1157 0,
1158 end_payload,
1159 );
1160 node.queue_incoming(end_frame);
1161 node.tick();
1162
1163 let field = node.state_engine().field();
1164 assert!(!field.contains(livestream_state_id(stream_id)));
1165 assert!(!field.contains(stream_visual_state_id(stream_id)));
1166 assert!(node.stream_metadata(stream_id).is_none());
1167 }
1168
1169 #[test]
1170 fn test_feed_append_roundtrip() {
1171 let mut node = Node::new();
1172 let feed_state = feed_id(7);
1173
1174 let item = MspFeedItem::new(
1176 elara_core::MessageId(1),
1177 NodeId::new(123),
1178 b"hello feed".to_vec(),
1179 elara_core::StateTime::from_millis(0),
1180 );
1181 let encoded_item = item.encode();
1182
1183 let payload = build_payload(
1184 EventType::FeedAppend,
1185 feed_state,
1186 MutationOp::Append(encoded_item),
1187 );
1188 let frame = incoming_frame_for(
1189 SessionId::new(1),
1190 NodeId::new(5000),
1191 PacketClass::Core,
1192 RepresentationProfile::Textual,
1193 0,
1194 payload,
1195 );
1196
1197 node.queue_incoming(frame);
1198 node.tick();
1199
1200 let stream = node.feed_stream(feed_state);
1201 assert_eq!(stream.items.len(), 1);
1202 let first = &stream.items[0];
1203 assert_eq!(first.id.0, 1);
1204 assert_eq!(first.author, NodeId::new(123));
1205 assert_eq!(first.content, b"hello feed".to_vec());
1206 assert!(!first.deleted);
1207 }
1208}