1use std::collections::{HashMap, VecDeque};
4use std::sync::Arc;
5use std::time::{Duration, Instant};
6
7use elara_core::{
8 Event, EventType, MessageId, MutationOp, NodeId, PacketClass, RepresentationProfile, SessionId,
9 StateId, StateTime, TimeIntent, VersionVector,
10};
11use elara_crypto::{Identity, SecureFrameProcessor};
12use elara_state::ReconciliationEngine;
13use elara_time::TimeEngine;
14use elara_visual::{
15 livestream_state_id, stream_visual_state_id, visual_state_id, PredictionConfig, VisualEncoder,
16 VisualPredictor, VisualState, VisualStateBuffer,
17};
18use elara_wire::{Extensions, FixedHeader, Frame, FrameBuilder, AUTH_TAG_SIZE};
19
20use crate::observability::metrics::NodeMetrics;
21use crate::observability::ObservabilityConfig;
22
23#[derive(Clone, Debug)]
82pub struct NodeConfig {
83 pub tick_interval: Duration,
85 pub max_packet_buffer: usize,
87 pub max_outgoing_buffer: usize,
89 pub max_local_events: usize,
90 pub metrics: Option<NodeMetrics>,
92 pub observability: Option<ObservabilityConfig>,
102 pub health_checks: Option<crate::health::HealthCheckConfig>,
151}
152
153#[derive(Clone, Debug, Default)]
154pub struct RuntimeStats {
155 pub ticks: u64,
156 pub incoming_queued: u64,
157 pub outgoing_popped: u64,
158 pub local_events_queued: u64,
159 pub events_signed: u64,
160 pub packets_in: u64,
161 pub packets_out: u64,
162 pub last_tick_duration: Duration,
163}
164
165#[derive(Clone, Debug)]
166pub struct FeedItem {
167 pub id: MessageId,
168 pub author: NodeId,
169 pub content: Vec<u8>,
170 pub timestamp: StateTime,
171 pub deleted: bool,
172}
173
174#[derive(Clone, Debug, Default)]
175pub struct FeedStream {
176 pub items: Vec<FeedItem>,
177}
178
179#[derive(Clone, Debug)]
180pub struct StreamMetadata {
181 pub source: NodeId,
182 pub started_at: StateTime,
183 pub data: Vec<u8>,
184}
185
186impl FeedItem {
187 fn decode(buf: &[u8]) -> Option<(Self, usize)> {
188 if buf.len() < 27 {
189 return None;
190 }
191
192 let id = MessageId(u64::from_le_bytes(buf[0..8].try_into().ok()?));
193 let author = NodeId::from_bytes(buf[8..16].try_into().ok()?);
194 let timestamp = StateTime::from_micros(i64::from_le_bytes(buf[16..24].try_into().ok()?));
195 let deleted = buf[24] != 0;
196 let mut offset = 25;
197
198 if buf.len() < offset + 2 {
199 return None;
200 }
201 let content_len = u16::from_le_bytes(buf[offset..offset + 2].try_into().ok()?) as usize;
202 offset += 2;
203
204 if buf.len() < offset + content_len {
205 return None;
206 }
207 let content = buf[offset..offset + content_len].to_vec();
208 offset += content_len;
209
210 Some((
211 FeedItem {
212 id,
213 author,
214 content,
215 timestamp,
216 deleted,
217 },
218 offset,
219 ))
220 }
221}
222
223impl FeedStream {
224 fn from_bytes(data: &[u8]) -> Self {
225 let mut stream = FeedStream::default();
226 let mut offset = 0;
227 while offset < data.len() {
228 let Some((item, used)) = FeedItem::decode(&data[offset..]) else {
229 break;
230 };
231 stream.apply_item(item);
232 offset += used;
233 }
234 stream
235 }
236
237 fn apply_item(&mut self, item: FeedItem) {
238 if let Some(existing) = self.items.iter_mut().find(|m| m.id == item.id) {
239 existing.deleted = item.deleted;
240 existing.content = item.content;
241 existing.timestamp = item.timestamp;
242 existing.author = item.author;
243 return;
244 }
245
246 let pos = self
247 .items
248 .binary_search_by(|m| m.timestamp.cmp(&item.timestamp))
249 .unwrap_or_else(|p| p);
250 self.items.insert(pos, item);
251 }
252}
253
254impl Default for NodeConfig {
255 fn default() -> Self {
256 NodeConfig {
257 tick_interval: Duration::from_millis(10),
258 max_packet_buffer: 1000,
259 max_outgoing_buffer: 1000,
260 max_local_events: 1000,
261 metrics: None,
262 observability: None, health_checks: None, }
265 }
266}
267
268impl NodeConfig {
269 pub fn init_health_checks(
334 &self,
335 node: Arc<Node>,
336 ) -> Option<(
337 Arc<crate::health::HealthChecker>,
338 Option<tokio::task::JoinHandle<Result<(), std::io::Error>>>,
339 )> {
340 use crate::health::{
341 ConnectionHealthCheck, HealthChecker, MemoryHealthCheck, StateDivergenceCheck,
342 TimeDriftCheck,
343 };
344 use crate::health_server::{HealthServer, HealthServerConfig};
345
346 let health_config = self.health_checks.as_ref()?;
348
349 health_config
351 .validate()
352 .expect("Invalid health check configuration");
353
354 if !health_config.enabled {
356 return None;
357 }
358
359 let mut checker = HealthChecker::new(health_config.cache_ttl);
361
362 if let Some(min_connections) = health_config.min_connections {
364 checker.add_check(Box::new(ConnectionHealthCheck::new(
365 node.clone(),
366 min_connections,
367 )));
368 }
369
370 if let Some(max_memory_mb) = health_config.max_memory_mb {
371 checker.add_check(Box::new(MemoryHealthCheck::new(max_memory_mb)));
372 }
373
374 if let Some(max_drift_ms) = health_config.max_time_drift_ms {
375 checker.add_check(Box::new(TimeDriftCheck::new(node.clone(), max_drift_ms)));
376 }
377
378 if let Some(max_pending_events) = health_config.max_pending_events {
379 checker.add_check(Box::new(StateDivergenceCheck::with_threshold(
380 node.clone(),
381 max_pending_events,
382 )));
383 }
384
385 let checker = Arc::new(checker);
386
387 let server_handle = if let Some(bind_address) = health_config.server_bind_address {
389 let server_config = HealthServerConfig { bind_address };
390 let server = HealthServer::new(checker.clone(), server_config);
391
392 let handle = tokio::spawn(async move {
394 server.serve().await
395 });
396
397 Some(handle)
398 } else {
399 None
400 };
401
402 Some((checker, server_handle))
403 }
404}
405
406pub struct Node {
408 identity: Identity,
410 session_id: Option<SessionId>,
412 time_engine: TimeEngine,
414 state_engine: ReconciliationEngine,
416 secure_processor: Option<SecureFrameProcessor>,
417 incoming: VecDeque<Frame>,
419 outgoing: VecDeque<Frame>,
421 local_events: Vec<Event>,
423 event_seq: u64,
425 config: NodeConfig,
427 stats: RuntimeStats,
428 stream_metadata: HashMap<u64, StreamMetadata>,
429 visual_buffers: HashMap<NodeId, VisualStateBuffer>,
430 visual_predictors: HashMap<NodeId, VisualPredictor>,
431 stream_visual_buffers: HashMap<u64, VisualStateBuffer>,
432 stream_visual_predictors: HashMap<u64, VisualPredictor>,
433 metrics: Option<NodeMetrics>,
435}
436
437impl Node {
438 pub fn new() -> Self {
440 let node = Self::with_config(NodeConfig::default());
441 tracing::info!(
442 node_id = node.node_id().0,
443 "Created new node"
444 );
445 node
446 }
447
448 pub fn with_config(config: NodeConfig) -> Self {
450 let metrics = config.metrics.clone();
451 Node {
452 identity: Identity::generate(),
453 session_id: None,
454 time_engine: TimeEngine::new(),
455 state_engine: ReconciliationEngine::new(),
456 secure_processor: None,
457 incoming: VecDeque::new(),
458 outgoing: VecDeque::new(),
459 local_events: Vec::new(),
460 event_seq: 0,
461 config,
462 stats: RuntimeStats::default(),
463 stream_metadata: HashMap::new(),
464 visual_buffers: HashMap::new(),
465 visual_predictors: HashMap::new(),
466 stream_visual_buffers: HashMap::new(),
467 stream_visual_predictors: HashMap::new(),
468 metrics,
469 }
470 }
471
472 pub fn with_identity(identity: Identity, config: NodeConfig) -> Self {
473 let metrics = config.metrics.clone();
474 Node {
475 identity,
476 session_id: None,
477 time_engine: TimeEngine::new(),
478 state_engine: ReconciliationEngine::new(),
479 secure_processor: None,
480 incoming: VecDeque::new(),
481 outgoing: VecDeque::new(),
482 local_events: Vec::new(),
483 event_seq: 0,
484 config,
485 stats: RuntimeStats::default(),
486 stream_metadata: HashMap::new(),
487 visual_buffers: HashMap::new(),
488 visual_predictors: HashMap::new(),
489 stream_visual_buffers: HashMap::new(),
490 stream_visual_predictors: HashMap::new(),
491 metrics,
492 }
493 }
494
495 pub fn node_id(&self) -> NodeId {
497 self.identity.node_id()
498 }
499
500 pub fn session_id(&self) -> Option<SessionId> {
502 self.session_id
503 }
504
505 pub fn join_session(&mut self, session_id: SessionId, session_key: [u8; 32]) {
507 let span = tracing::span!(
508 tracing::Level::INFO,
509 "join_session",
510 node_id = self.node_id().0,
511 session_id = session_id.0
512 );
513 let _enter = span.enter();
514
515 tracing::info!(
516 node_id = self.node_id().0,
517 session_id = session_id.0,
518 "Joining session"
519 );
520
521 self.session_id = Some(session_id);
522 self.secure_processor = Some(SecureFrameProcessor::new(
523 session_id,
524 self.node_id(),
525 session_key,
526 ));
527
528 if let Some(ref metrics) = self.metrics {
530 metrics.active_connections.inc();
531 metrics.total_connections.inc();
532 }
533 }
534
535 pub fn join_session_unsecured(&mut self, session_id: SessionId) {
536 let span = tracing::span!(
537 tracing::Level::INFO,
538 "join_session_unsecured",
539 node_id = self.node_id().0,
540 session_id = session_id.0
541 );
542 let _enter = span.enter();
543
544 tracing::warn!(
545 node_id = self.node_id().0,
546 session_id = session_id.0,
547 "Joining session without encryption (unsecured mode)"
548 );
549
550 self.session_id = Some(session_id);
551 self.secure_processor = None;
552
553 if let Some(ref metrics) = self.metrics {
555 metrics.active_connections.inc();
556 metrics.total_connections.inc();
557 }
558 }
559
560 pub fn leave_session(&mut self) {
562 let span = tracing::span!(
563 tracing::Level::INFO,
564 "leave_session",
565 node_id = self.node_id().0,
566 session_id = ?self.session_id
567 );
568 let _enter = span.enter();
569
570 if let Some(session_id) = self.session_id {
571 tracing::info!(
572 node_id = self.node_id().0,
573 session_id = session_id.0,
574 "Leaving session"
575 );
576
577 if let Some(ref metrics) = self.metrics {
579 metrics.active_connections.dec();
580 }
581 }
582
583 self.session_id = None;
584 self.secure_processor = None;
585 }
586
587 pub fn queue_incoming(&mut self, frame: Frame) {
589 if self.incoming.len() < self.config.max_packet_buffer {
590 self.incoming.push_back(frame);
591 self.stats.incoming_queued += 1;
592 } else {
593 tracing::warn!(
594 node_id = self.node_id().0,
595 buffer_size = self.incoming.len(),
596 max_buffer = self.config.max_packet_buffer,
597 "Incoming packet buffer full, dropping frame"
598 );
599
600 if let Some(ref metrics) = self.metrics {
602 metrics.messages_dropped.inc();
603 }
604 }
605 }
606
607 pub fn pop_outgoing(&mut self) -> Option<Frame> {
609 let frame = self.outgoing.pop_front();
610 if frame.is_some() {
611 self.stats.outgoing_popped += 1;
612 self.stats.packets_out += 1;
613 }
614 frame
615 }
616
617 pub fn queue_local_event(&mut self, event: Event) {
619 if self.local_events.len() < self.config.max_local_events {
620 self.local_events.push(event);
621 self.stats.local_events_queued += 1;
622 }
623 }
624
625 pub fn queue_visual_keyframe(&mut self, state: &VisualState) {
626 self.queue_visual_event(
627 visual_state_id(state.source),
628 state,
629 EventType::VisualKeyframe,
630 );
631 }
632
633 pub fn queue_visual_delta(&mut self, state: &VisualState) {
634 self.queue_visual_event(visual_state_id(state.source), state, EventType::VisualDelta);
635 }
636
637 pub fn queue_stream_visual_keyframe(&mut self, stream_id: u64, state: &VisualState) {
638 self.queue_visual_event(
639 stream_visual_state_id(stream_id),
640 state,
641 EventType::VisualKeyframe,
642 );
643 }
644
645 pub fn queue_stream_visual_delta(&mut self, stream_id: u64, state: &VisualState) {
646 self.queue_visual_event(
647 stream_visual_state_id(stream_id),
648 state,
649 EventType::VisualDelta,
650 );
651 }
652
653 pub fn queue_stream_start(&mut self, stream_id: u64, metadata: Vec<u8>, timestamp: StateTime) {
654 self.stream_metadata.insert(
655 stream_id,
656 StreamMetadata {
657 source: self.node_id(),
658 started_at: timestamp,
659 data: metadata.clone(),
660 },
661 );
662 let target_state = livestream_state_id(stream_id);
663 let seq = self.next_event_seq();
664 let time_intent = self.time_intent_for(timestamp);
665 let event = Event::new(
666 self.node_id(),
667 seq,
668 EventType::StreamStart,
669 target_state,
670 MutationOp::Set(metadata),
671 )
672 .with_time_intent(time_intent);
673 self.queue_local_event(event);
674 }
675
676 pub fn queue_stream_end(&mut self, stream_id: u64, timestamp: StateTime) {
677 self.stream_metadata.remove(&stream_id);
678 self.stream_visual_buffers.remove(&stream_id);
679 self.stream_visual_predictors.remove(&stream_id);
680 let target_state = livestream_state_id(stream_id);
681 let seq = self.next_event_seq();
682 let time_intent = self.time_intent_for(timestamp);
683 let event = Event::new(
684 self.node_id(),
685 seq,
686 EventType::StreamEnd,
687 target_state,
688 MutationOp::Delete,
689 )
690 .with_time_intent(time_intent);
691 self.queue_local_event(event);
692
693 let visual_state = stream_visual_state_id(stream_id);
694 let seq = self.next_event_seq();
695 let event = Event::new(
696 self.node_id(),
697 seq,
698 EventType::StreamEnd,
699 visual_state,
700 MutationOp::Delete,
701 )
702 .with_time_intent(time_intent);
703 self.queue_local_event(event);
704 }
705
706 pub fn queue_feed_append(&mut self, feed_state: StateId, data: Vec<u8>, timestamp: StateTime) {
707 let seq = self.next_event_seq();
708 let time_intent = self.time_intent_for(timestamp);
709 let event = Event::new(
710 self.node_id(),
711 seq,
712 EventType::FeedAppend,
713 feed_state,
714 MutationOp::Append(data),
715 )
716 .with_time_intent(time_intent);
717 self.queue_local_event(event);
718 }
719
720 pub fn queue_feed_delete(&mut self, feed_state: StateId, timestamp: StateTime) {
721 let seq = self.next_event_seq();
722 let time_intent = self.time_intent_for(timestamp);
723 let event = Event::new(
724 self.node_id(),
725 seq,
726 EventType::FeedDelete,
727 feed_state,
728 MutationOp::Delete,
729 )
730 .with_time_intent(time_intent);
731 self.queue_local_event(event);
732 }
733
734 pub fn tick(&mut self) {
737 let span = tracing::span!(
738 tracing::Level::INFO,
739 "node_tick",
740 node_id = self.node_id().0,
741 session_id = ?self.session_id
742 );
743 let _enter = span.enter();
744
745 let start = Instant::now();
746 self.stats.ticks += 1;
747
748 self.time_engine.tick();
750
751 let packets = self.ingest_packets();
753
754 let validated = self.decrypt_and_validate(packets);
756
757 let classify_start = Instant::now();
759 let events = self.classify_events(validated);
760
761 if let Some(ref metrics) = self.metrics {
763 let latency_ms = classify_start.elapsed().as_secs_f64() * 1000.0;
764 if !events.is_empty() {
765 metrics.message_latency_ms.observe(latency_ms);
766 }
767 }
768
769 self.update_time_model(&events);
771
772 let reconcile_start = Instant::now();
774 let reconcile_result = {
775 let span = tracing::span!(
776 tracing::Level::DEBUG,
777 "state_reconciliation",
778 node_id = self.node_id().0
779 );
780 let _enter = span.enter();
781
782 let result = self.state_engine.process_events(events, &self.time_engine);
783 self.state_engine.control_divergence();
784
785 tracing::debug!(
786 applied = result.applied,
787 rejected = result.rejected,
788 "State reconciliation complete"
789 );
790 result
791 };
792
793 if let Some(ref metrics) = self.metrics {
795 let sync_latency_ms = reconcile_start.elapsed().as_secs_f64() * 1000.0;
796 metrics.state_sync_latency_ms.observe(sync_latency_ms);
797 }
798
799 if let Some(ref metrics) = self.metrics {
801 let quarantine_size = self.state_engine.field().quarantine_size();
803 metrics.quarantine_buffer_size.set(quarantine_size as i64);
804
805 if reconcile_result.rejected > 0 {
807 metrics.messages_dropped.inc_by(reconcile_result.rejected as u64);
808 }
809 }
810
811 if let Some(ref metrics) = self.metrics {
813 let max_offset_ms = self.time_engine
814 .network()
815 .peers
816 .values()
817 .map(|peer| peer.offset.abs() * 1000.0) .fold(0.0f64, f64::max);
819
820 if max_offset_ms > 0.0 {
821 metrics.time_drift_ms.set(max_offset_ms as i64);
822 }
823 }
824
825 self.generate_predictions();
827
828 let authorized = self.authorize_and_sign();
833
834 self.build_packets(authorized);
836
837 self.stats.last_tick_duration = start.elapsed();
840 }
841
842 fn ingest_packets(&mut self) -> Vec<Frame> {
844 let span = tracing::span!(
845 tracing::Level::DEBUG,
846 "ingest_packets",
847 node_id = self.node_id().0
848 );
849 let _enter = span.enter();
850
851 let packets: Vec<Frame> = self.incoming.drain(..).collect();
852 self.stats.packets_in += packets.len() as u64;
853
854 if let Some(ref metrics) = self.metrics {
856 metrics.messages_received.inc_by(packets.len() as u64);
857 }
858
859 tracing::debug!(packet_count = packets.len(), "Ingested packets");
860 packets
861 }
862
863 fn decrypt_and_validate(&mut self, packets: Vec<Frame>) -> Vec<Frame> {
865 let span = tracing::span!(
866 tracing::Level::DEBUG,
867 "decrypt_and_validate",
868 node_id = self.node_id().0,
869 packet_count = packets.len()
870 );
871 let _enter = span.enter();
872
873 let Some(processor) = self.secure_processor.as_mut() else {
874 tracing::debug!("No secure processor, skipping decryption");
875 return packets;
876 };
877
878 let initial_count = packets.len();
879 let validated: Vec<Frame> = packets
880 .into_iter()
881 .filter_map(|frame| {
882 let data = frame.serialize().ok()?;
883 let decrypted = processor.decrypt_frame(&data).ok()?;
884 let auth_tag = [0u8; AUTH_TAG_SIZE];
885 Some(Frame {
886 header: decrypted.header,
887 extensions: decrypted.extensions,
888 payload: decrypted.payload,
889 auth_tag,
890 })
891 })
892 .collect();
893
894 let failed_count = initial_count - validated.len();
895 if failed_count > 0 {
896 tracing::warn!(
897 node_id = self.node_id().0,
898 failed_count = failed_count,
899 validated_count = validated.len(),
900 "Some frames failed decryption/validation"
901 );
902
903 if let Some(ref metrics) = self.metrics {
905 metrics.failed_connections.inc_by(failed_count as u64);
906 }
907 }
908
909 tracing::debug!(
910 validated_count = validated.len(),
911 failed_count = failed_count,
912 "Decryption and validation complete"
913 );
914 validated
915 }
916
917 fn classify_events(&mut self, packets: Vec<Frame>) -> Vec<Event> {
919 let span = tracing::span!(
920 tracing::Level::DEBUG,
921 "classify_events",
922 node_id = self.node_id().0,
923 packet_count = packets.len()
924 );
925 let _enter = span.enter();
926
927 let mut events = Vec::new();
928
929 for frame in packets {
930 let source = frame.header.node_id;
931 let time_hint = frame.header.time_hint;
932 let packet_class = frame.header.class;
933
934 if let Some(ref metrics) = self.metrics {
936 metrics.message_size_bytes.observe(frame.payload.len() as f64);
937 }
938
939 let frame_events = self.decode_event_blocks(&frame.payload, source, time_hint);
940 tracing::trace!(
941 source = source.0,
942 event_count = frame_events.len(),
943 packet_class = ?packet_class,
944 "Decoded events from frame"
945 );
946
947 for event in &frame_events {
948 self.handle_event_side_effects(event);
949 }
950 events.extend(frame_events);
951 }
952
953 tracing::debug!(event_count = events.len(), "Event classification complete");
954 events
955 }
956
957 fn handle_event_side_effects(&mut self, event: &Event) {
958 match event.event_type {
959 EventType::StreamStart => {
960 let stream_id = event.target_state.instance();
961 let started_at = event.absolute_time(self.time_engine.tau_s());
962
963 if let MutationOp::Set(data) = &event.mutation {
964 self.stream_metadata.insert(
965 stream_id,
966 StreamMetadata {
967 source: event.source,
968 started_at,
969 data: data.clone(),
970 },
971 );
972 }
973
974 let field = self.state_engine_mut().field_mut();
975 if field.get(event.target_state).is_none() {
976 field.create_atom(
977 event.target_state,
978 elara_core::StateType::Core,
979 event.source,
980 );
981 }
982
983 let visual_state = stream_visual_state_id(stream_id);
984 if field.get(visual_state).is_none() {
985 field.create_atom(
986 visual_state,
987 elara_core::StateType::Perceptual,
988 event.source,
989 );
990 }
991 }
992 EventType::StreamEnd => {
993 let stream_id = event.target_state.instance();
994 self.stream_metadata.remove(&stream_id);
995 self.stream_visual_buffers.remove(&stream_id);
996 self.stream_visual_predictors.remove(&stream_id);
997
998 let field = self.state_engine_mut().field_mut();
999 field.remove(livestream_state_id(stream_id));
1000 field.remove(stream_visual_state_id(stream_id));
1001 }
1002 EventType::VisualKeyframe | EventType::VisualDelta => {
1003 if let MutationOp::Set(data) = &event.mutation {
1004 if let Ok(state) = VisualEncoder::decode(data) {
1005 let stream_id = if self
1006 .stream_metadata
1007 .contains_key(&event.target_state.instance())
1008 {
1009 Some(event.target_state.instance())
1010 } else {
1011 None
1012 };
1013 self.update_visual_state(state, stream_id);
1014 }
1015 }
1016 }
1017 _ => {}
1018 }
1019 }
1020
1021 fn update_time_model(&mut self, events: &[Event]) {
1023 let span = tracing::span!(
1024 tracing::Level::DEBUG,
1025 "update_time_model",
1026 node_id = self.node_id().0,
1027 event_count = events.len()
1028 );
1029 let _enter = span.enter();
1030
1031 let reference = self.time_engine.tau_s();
1032 for event in events {
1033 let remote_time = event.time_intent.to_absolute(reference);
1034 let seq = (event.id.seq & 0xFFFF) as u16;
1035 self.time_engine
1036 .update_from_packet(event.source, remote_time, seq);
1037 }
1038
1039 tracing::debug!("Time model updated");
1040 }
1041
1042 fn generate_predictions(&mut self) {
1044 let dt_us = self.config.tick_interval.as_micros() as u64;
1045 {
1046 let field = self.state_engine.field_mut();
1047 for atom in field.atoms.values_mut() {
1048 atom.entropy.time_since_actual =
1049 atom.entropy.time_since_actual.saturating_add(dt_us);
1050 }
1051 }
1052
1053 let needs_prediction = self.state_engine.field().atoms_needing_prediction(100);
1054 if needs_prediction.is_empty() {
1055 return;
1056 }
1057
1058 let field = self.state_engine.field_mut();
1059 for state_id in needs_prediction {
1060 if let Some(atom) = field.atoms.get_mut(&state_id) {
1061 let increase = match atom.state_type {
1062 elara_core::StateType::Core => 0.01,
1063 elara_core::StateType::Perceptual => 0.03,
1064 elara_core::StateType::Enhancement => 0.05,
1065 elara_core::StateType::Cosmetic => 0.07,
1066 };
1067 atom.entropy.increase(increase);
1068 }
1069 }
1070 }
1071
1072 fn authorize_and_sign(&mut self) -> Vec<Event> {
1074 let span = tracing::span!(
1075 tracing::Level::DEBUG,
1076 "authorize_and_sign",
1077 node_id = self.node_id().0,
1078 event_count = self.local_events.len()
1079 );
1080 let _enter = span.enter();
1081
1082 let events: Vec<Event> = self.local_events.drain(..).collect();
1083 self.stats.events_signed += events.len() as u64;
1084
1085 let signed_events = events
1086 .into_iter()
1087 .map(|mut event| {
1088 let signature = self.identity.sign(&event.mutation.encode());
1090 event.authority_proof.signature = signature;
1091 event
1092 })
1093 .collect();
1094
1095 tracing::debug!("Events authorized and signed");
1096 signed_events
1097 }
1098
1099 fn build_packets(&mut self, _events: Vec<Event>) {
1101 let span = tracing::span!(
1102 tracing::Level::DEBUG,
1103 "build_packets",
1104 node_id = self.node_id().0,
1105 event_count = _events.len()
1106 );
1107 let _enter = span.enter();
1108
1109 let Some(processor) = self.secure_processor.as_mut() else {
1110 self.build_plain_packets(_events);
1111 return;
1112 };
1113
1114 let mut packets_built = 0;
1115 for event in _events {
1116 if self.outgoing.len() >= self.config.max_outgoing_buffer {
1117 if let Some(ref metrics) = self.metrics {
1119 metrics.messages_dropped.inc();
1120 }
1121 tracing::warn!("Outgoing buffer full, dropping messages");
1122 break;
1123 }
1124
1125 let class = Self::class_for_event(&event);
1126 let profile = Self::profile_for_event(&event);
1127 let time_hint = event.time_intent.ts_offset();
1128 let payload = Self::encode_event_block(&event);
1129
1130 if let Ok(bytes) =
1131 processor.encrypt_frame(class, profile, time_hint, Extensions::new(), &payload)
1132 {
1133 if let Ok(frame) = Frame::parse(&bytes) {
1134 self.outgoing.push_back(frame);
1135 packets_built += 1;
1136
1137 if let Some(ref metrics) = self.metrics {
1139 metrics.messages_sent.inc();
1140 metrics.message_size_bytes.observe(bytes.len() as f64);
1141 }
1142 }
1143 }
1144 }
1145
1146 tracing::debug!(packets_built = packets_built, "Packets built");
1147 }
1148
1149 fn build_plain_packets(&mut self, events: Vec<Event>) {
1150 let span = tracing::span!(
1151 tracing::Level::DEBUG,
1152 "build_plain_packets",
1153 node_id = self.node_id().0,
1154 event_count = events.len()
1155 );
1156 let _enter = span.enter();
1157
1158 let mut packets_built = 0;
1159 for event in events {
1160 if self.outgoing.len() >= self.config.max_outgoing_buffer {
1161 if let Some(ref metrics) = self.metrics {
1163 metrics.messages_dropped.inc();
1164 }
1165 tracing::warn!("Outgoing buffer full, dropping messages");
1166 break;
1167 }
1168
1169 let class = Self::class_for_event(&event);
1170 let profile = Self::profile_for_event(&event);
1171 let time_hint = event.time_intent.ts_offset();
1172 let payload = Self::encode_event_block(&event);
1173
1174 let session_id = self.session_id.unwrap_or(SessionId::ZERO);
1175 let mut header = FixedHeader::new(session_id, self.node_id());
1176 header.class = class;
1177 header.profile = profile;
1178 header.time_hint = time_hint;
1179
1180 let frame = FrameBuilder::new(header).payload(payload.clone()).build();
1181 self.outgoing.push_back(frame);
1182 packets_built += 1;
1183
1184 if let Some(ref metrics) = self.metrics {
1186 metrics.messages_sent.inc();
1187 metrics.message_size_bytes.observe(payload.len() as f64);
1188 }
1189 }
1190
1191 tracing::debug!(packets_built = packets_built, "Plain packets built");
1192 }
1193
1194 fn decode_event_blocks(&self, payload: &[u8], source: NodeId, time_hint: i32) -> Vec<Event> {
1195 let mut events = Vec::new();
1196 let mut offset = 0;
1197
1198 while payload.len().saturating_sub(offset) >= 13 {
1199 let event_type = match EventType::from_byte(payload[offset]) {
1200 Some(t) => t,
1201 None => break,
1202 };
1203 offset += 1;
1204
1205 let state_end = offset + 8;
1206 if state_end > payload.len() {
1207 break;
1208 }
1209 let state_id = match payload[offset..state_end].try_into() {
1210 Ok(bytes) => StateId::from_bytes(bytes),
1211 Err(_) => break,
1212 };
1213 offset = state_end;
1214
1215 let version_len_end = offset + 2;
1216 if version_len_end > payload.len() {
1217 break;
1218 }
1219 let version_len = match payload[offset..version_len_end].try_into() {
1220 Ok(bytes) => u16::from_le_bytes(bytes) as usize,
1221 Err(_) => break,
1222 };
1223 offset = version_len_end;
1224
1225 let version_end = offset + version_len;
1226 if version_end > payload.len() {
1227 break;
1228 }
1229 let version_ref = match Self::decode_version_vector(&payload[offset..version_end]) {
1230 Some(v) => v,
1231 None => break,
1232 };
1233 offset = version_end;
1234
1235 let delta_len_end = offset + 2;
1236 if delta_len_end > payload.len() {
1237 break;
1238 }
1239 let delta_len = match payload[offset..delta_len_end].try_into() {
1240 Ok(bytes) => u16::from_le_bytes(bytes) as usize,
1241 Err(_) => break,
1242 };
1243 offset = delta_len_end;
1244
1245 let delta_end = offset + delta_len;
1246 if delta_end > payload.len() {
1247 break;
1248 }
1249 let delta = &payload[offset..delta_end];
1250 let (mutation, used) = match MutationOp::decode(delta) {
1251 Some(decoded) => decoded,
1252 None => break,
1253 };
1254 if used != delta_len {
1255 break;
1256 }
1257 offset = delta_end;
1258
1259 let seq = version_ref.get(source).saturating_add(1);
1260 let event = Event::new(source, seq, event_type, state_id, mutation)
1261 .with_version(version_ref)
1262 .with_time_intent(TimeIntent::new(time_hint));
1263 events.push(event);
1264 }
1265
1266 events
1267 }
1268
1269 fn encode_event_block(event: &Event) -> Vec<u8> {
1270 let mut buf = Vec::new();
1271 buf.push(event.event_type.to_byte());
1272 buf.extend_from_slice(&event.target_state.to_bytes());
1273
1274 let version = Self::encode_version_vector(&event.version_ref);
1275 buf.extend_from_slice(&(version.len() as u16).to_le_bytes());
1276 buf.extend_from_slice(&version);
1277
1278 let delta = event.mutation.encode();
1279 buf.extend_from_slice(&(delta.len() as u16).to_le_bytes());
1280 buf.extend_from_slice(&delta);
1281
1282 buf
1283 }
1284
1285 fn encode_version_vector(version: &VersionVector) -> Vec<u8> {
1286 let mut entries = version.to_compact();
1287 entries.sort_by_key(|(node, _)| node.0);
1288 let mut buf = Vec::with_capacity(entries.len() * 16);
1289 for (node, count) in entries {
1290 buf.extend_from_slice(&node.to_bytes());
1291 buf.extend_from_slice(&count.to_le_bytes());
1292 }
1293 buf
1294 }
1295
1296 fn decode_version_vector(buf: &[u8]) -> Option<VersionVector> {
1297 if !buf.len().is_multiple_of(16) {
1298 return None;
1299 }
1300 let mut entries = Vec::new();
1301 for chunk in buf.chunks_exact(16) {
1302 let node = match chunk[0..8].try_into() {
1303 Ok(bytes) => NodeId::from_bytes(bytes),
1304 Err(_) => return None,
1305 };
1306 let count = match chunk[8..16].try_into() {
1307 Ok(bytes) => u64::from_le_bytes(bytes),
1308 Err(_) => return None,
1309 };
1310 entries.push((node, count));
1311 }
1312 Some(VersionVector::from_compact(entries))
1313 }
1314
1315 fn class_for_event(event: &Event) -> PacketClass {
1316 match event.event_type {
1317 EventType::StateRequest | EventType::StateResponse | EventType::GapFill => {
1318 PacketClass::Repair
1319 }
1320 EventType::VoiceFrame | EventType::VoiceMute => PacketClass::Perceptual,
1321 EventType::TypingStart | EventType::TypingStop | EventType::PresenceUpdate => {
1322 PacketClass::Perceptual
1323 }
1324 EventType::VisualKeyframe | EventType::VisualDelta => PacketClass::Perceptual,
1325 EventType::TextAppend
1326 | EventType::TextEdit
1327 | EventType::TextDelete
1328 | EventType::TextReact => PacketClass::Core,
1329 EventType::FeedAppend | EventType::FeedDelete => PacketClass::Core,
1330 EventType::StreamStart | EventType::StreamEnd => PacketClass::Core,
1331 _ => PacketClass::Core,
1332 }
1333 }
1334
1335 fn profile_for_event(event: &Event) -> RepresentationProfile {
1336 match event.event_type {
1337 EventType::VoiceFrame | EventType::VoiceMute => RepresentationProfile::VoiceMinimal,
1338 EventType::VisualKeyframe | EventType::VisualDelta => {
1339 RepresentationProfile::VideoStandard
1340 }
1341 EventType::StreamStart | EventType::StreamEnd => {
1342 RepresentationProfile::StreamAsymmetric
1343 }
1344 _ => RepresentationProfile::Textual,
1345 }
1346 }
1347
1348 fn queue_visual_event(
1349 &mut self,
1350 target_state: StateId,
1351 state: &VisualState,
1352 event_type: EventType,
1353 ) {
1354 let seq = self.next_event_seq();
1355 let time_intent = self.time_intent_for(state.timestamp);
1356 let payload = VisualEncoder::encode(state);
1357 let event = Event::new(
1358 self.node_id(),
1359 seq,
1360 event_type,
1361 target_state,
1362 MutationOp::Set(payload),
1363 )
1364 .with_time_intent(time_intent);
1365 self.queue_local_event(event);
1366 }
1367
1368 fn time_intent_for(&self, timestamp: StateTime) -> TimeIntent {
1369 let reference = self.time_engine.tau_s();
1370 let offset = timestamp.to_wire_offset(reference);
1371 TimeIntent::new(offset)
1372 }
1373
1374 fn visual_buffer_config(&self) -> (usize, u32) {
1375 let stability = self.time_engine.stability_score().clamp(0.1, 1.0);
1376 let instability = 1.0 - stability;
1377 let buffer_size = (24.0 + instability * 40.0).round() as usize;
1378 let buffer_delay_ms = (40.0 + instability * 140.0).round() as u32;
1379 (buffer_size.clamp(16, 64), buffer_delay_ms.clamp(30, 200))
1380 }
1381
1382 fn visual_predictor_config(&self) -> PredictionConfig {
1383 let stability = self.time_engine.stability_score().clamp(0.1, 1.0);
1384 let instability = (1.0 - stability) as f32;
1385 PredictionConfig {
1386 max_horizon_ms: ((400.0 + instability as f64 * 800.0).round() as u32).clamp(300, 1200),
1387 confidence_decay: (0.08 + instability * 0.12).clamp(0.06, 0.2),
1388 min_confidence: (0.25 + instability * 0.2).clamp(0.2, 0.5),
1389 ..PredictionConfig::default()
1390 }
1391 }
1392
1393 fn update_visual_state(&mut self, state: VisualState, stream_id: Option<u64>) {
1394 let (buffer_size, buffer_delay_ms) = self.visual_buffer_config();
1395 let predictor_config = self.visual_predictor_config();
1396
1397 let buffer = self
1398 .visual_buffers
1399 .entry(state.source)
1400 .or_insert_with(|| VisualStateBuffer::new(buffer_size, buffer_delay_ms));
1401 buffer.push(state.clone());
1402 let predictor = self
1403 .visual_predictors
1404 .entry(state.source)
1405 .or_insert_with(|| VisualPredictor::new(predictor_config.clone()));
1406 predictor.update(state.clone());
1407
1408 if let Some(stream_id) = stream_id {
1409 let stream_buffer = self
1410 .stream_visual_buffers
1411 .entry(stream_id)
1412 .or_insert_with(|| VisualStateBuffer::new(buffer_size, buffer_delay_ms));
1413 stream_buffer.push(state.clone());
1414 let stream_predictor = self
1415 .stream_visual_predictors
1416 .entry(stream_id)
1417 .or_insert_with(|| VisualPredictor::new(predictor_config));
1418 stream_predictor.update(state);
1419 }
1420 }
1421
1422 pub fn visual_state(&self, node_id: NodeId) -> Option<VisualState> {
1423 let atom = self.state_engine.field().get(visual_state_id(node_id))?;
1424 VisualEncoder::decode(&atom.value).ok()
1425 }
1426
1427 pub fn visual_state_at(
1428 &mut self,
1429 node_id: NodeId,
1430 target_time: StateTime,
1431 ) -> Option<VisualState> {
1432 let state = self
1433 .visual_buffers
1434 .get(&node_id)
1435 .and_then(|buffer| buffer.get_at(target_time));
1436 if state.is_some() {
1437 return state;
1438 }
1439 self.visual_predictors
1440 .get_mut(&node_id)
1441 .and_then(|predictor| predictor.predict(target_time))
1442 }
1443
1444 pub fn visual_state_now(&mut self, node_id: NodeId) -> Option<VisualState> {
1445 let now = self.time_engine.tau_s();
1446 self.visual_state_at(node_id, now)
1447 }
1448
1449 pub fn stream_visual_state(&self, stream_id: u64) -> Option<VisualState> {
1450 let atom = self
1451 .state_engine
1452 .field()
1453 .get(stream_visual_state_id(stream_id))?;
1454 VisualEncoder::decode(&atom.value).ok()
1455 }
1456
1457 pub fn stream_visual_state_at(
1458 &mut self,
1459 stream_id: u64,
1460 target_time: StateTime,
1461 ) -> Option<VisualState> {
1462 let state = self
1463 .stream_visual_buffers
1464 .get(&stream_id)
1465 .and_then(|buffer| buffer.get_at(target_time));
1466 if state.is_some() {
1467 return state;
1468 }
1469 self.stream_visual_predictors
1470 .get_mut(&stream_id)
1471 .and_then(|predictor| predictor.predict(target_time))
1472 }
1473
1474 pub fn stream_visual_state_now(&mut self, stream_id: u64) -> Option<VisualState> {
1475 let now = self.time_engine.tau_s();
1476 self.stream_visual_state_at(stream_id, now)
1477 }
1478
1479 pub fn feed_stream(&self, feed_state: StateId) -> FeedStream {
1480 self.state_engine
1481 .field()
1482 .get(feed_state)
1483 .map(|atom| FeedStream::from_bytes(&atom.value))
1484 .unwrap_or_default()
1485 }
1486
1487 pub fn stream_metadata(&self, stream_id: u64) -> Option<&StreamMetadata> {
1488 self.stream_metadata.get(&stream_id)
1489 }
1490
1491 pub fn time_engine(&self) -> &TimeEngine {
1493 &self.time_engine
1494 }
1495
1496 pub fn state_engine(&self) -> &ReconciliationEngine {
1498 &self.state_engine
1499 }
1500
1501 pub fn state_engine_mut(&mut self) -> &mut ReconciliationEngine {
1503 &mut self.state_engine
1504 }
1505
1506 pub fn stats(&self) -> &RuntimeStats {
1507 &self.stats
1508 }
1509
1510 pub fn in_session(&self) -> bool {
1512 self.session_id.is_some()
1513 }
1514
1515 pub fn next_event_seq(&mut self) -> u64 {
1517 let seq = self.event_seq;
1518 self.event_seq += 1;
1519 seq
1520 }
1521}
1522
1523impl Default for Node {
1524 fn default() -> Self {
1525 Self::new()
1526 }
1527}
1528
1529#[cfg(test)]
1530mod tests {
1531 use super::*;
1532 use elara_core::{PacketClass, RepresentationProfile};
1533 use elara_msp::text::{feed_stream_id as feed_id, FeedItem as MspFeedItem};
1534
1535 #[test]
1536 fn test_node_creation() {
1537 let node = Node::new();
1538 assert!(!node.in_session());
1539 }
1540
1541 #[test]
1542 fn test_node_tick() {
1543 let mut node = Node::new();
1544
1545 node.tick();
1547 node.tick();
1548 node.tick();
1549 }
1550
1551 #[test]
1552 fn test_local_event_buffer_limit() {
1553 let config = NodeConfig {
1554 max_local_events: 1,
1555 ..Default::default()
1556 };
1557 let mut node = Node::with_config(config);
1558
1559 let event_a = Event::new(
1560 NodeId::new(1),
1561 1,
1562 EventType::TextAppend,
1563 StateId::new(1),
1564 MutationOp::Append(b"a".to_vec()),
1565 );
1566 let event_b = Event::new(
1567 NodeId::new(1),
1568 2,
1569 EventType::TextAppend,
1570 StateId::new(1),
1571 MutationOp::Append(b"b".to_vec()),
1572 );
1573
1574 node.queue_local_event(event_a);
1575 node.queue_local_event(event_b);
1576
1577 assert_eq!(node.stats().local_events_queued, 1);
1578 }
1579
1580 #[test]
1581 fn test_prediction_entropy_advances() {
1582 let mut node = Node::new();
1583 let state_id = StateId::new(42);
1584 let node_id = node.node_id();
1585
1586 {
1587 let field = node.state_engine_mut().field_mut();
1588 let atom = field.create_atom(state_id, elara_core::StateType::Core, node_id);
1589 atom.entropy.time_since_actual = 0;
1590 }
1591
1592 node.tick();
1593
1594 let field = node.state_engine().field();
1595 let atom = field.get(state_id).unwrap();
1596 assert!(atom.entropy.time_since_actual > 0);
1597 }
1598
1599 #[test]
1600 fn test_node_session() {
1601 let mut node = Node::new();
1602 let session_id = SessionId::new(12345);
1603 let session_key = [0x42u8; 32];
1604
1605 node.join_session(session_id, session_key);
1606 assert!(node.in_session());
1607 assert_eq!(node.session_id(), Some(session_id));
1608
1609 node.leave_session();
1610 assert!(!node.in_session());
1611 }
1612
1613 fn build_payload(event_type: EventType, state_id: StateId, mutation: MutationOp) -> Vec<u8> {
1614 let mut buf = Vec::new();
1615 buf.push(event_type.to_byte());
1616 buf.extend_from_slice(&state_id.to_bytes());
1617
1618 buf.extend_from_slice(&(0u16).to_le_bytes());
1620
1621 let delta = mutation.encode();
1622 buf.extend_from_slice(&(delta.len() as u16).to_le_bytes());
1623 buf.extend_from_slice(&delta);
1624
1625 buf
1626 }
1627
1628 fn incoming_frame_for(
1629 session: SessionId,
1630 source: NodeId,
1631 class: PacketClass,
1632 profile: RepresentationProfile,
1633 time_hint: i32,
1634 payload: Vec<u8>,
1635 ) -> Frame {
1636 let mut header = FixedHeader::new(session, source);
1637 header.class = class;
1638 header.profile = profile;
1639 header.time_hint = time_hint;
1640 FrameBuilder::new(header).payload(payload).build()
1641 }
1642
1643 #[test]
1644 fn test_stream_start_side_effects_on_incoming_frame() {
1645 let mut node = Node::new();
1646 let stream_id = 42u64;
1647 let target_state = livestream_state_id(stream_id);
1648
1649 let payload = build_payload(
1650 EventType::StreamStart,
1651 target_state,
1652 MutationOp::Set(vec![1, 2, 3]),
1653 );
1654
1655 let frame = incoming_frame_for(
1656 SessionId::new(1),
1657 NodeId::new(9000),
1658 PacketClass::Core,
1659 RepresentationProfile::StreamAsymmetric,
1660 0,
1661 payload,
1662 );
1663
1664 node.queue_incoming(frame);
1665 node.tick();
1666
1667 let field = node.state_engine().field();
1668 assert!(field.contains(livestream_state_id(stream_id)));
1669 assert!(field.contains(stream_visual_state_id(stream_id)));
1670 assert!(node.stream_metadata(stream_id).is_some());
1671 }
1672
1673 #[test]
1674 fn test_stream_end_removes_atoms() {
1675 let mut node = Node::new();
1676 let stream_id = 99u64;
1677
1678 let start_payload = build_payload(
1680 EventType::StreamStart,
1681 livestream_state_id(stream_id),
1682 MutationOp::Set(vec![9, 9, 9]),
1683 );
1684 let start_frame = incoming_frame_for(
1685 SessionId::new(1),
1686 NodeId::new(7000),
1687 PacketClass::Core,
1688 RepresentationProfile::StreamAsymmetric,
1689 0,
1690 start_payload,
1691 );
1692 node.queue_incoming(start_frame);
1693 node.tick();
1694
1695 let end_payload = build_payload(
1697 EventType::StreamEnd,
1698 livestream_state_id(stream_id),
1699 MutationOp::Delete,
1700 );
1701 let end_frame = incoming_frame_for(
1702 SessionId::new(1),
1703 NodeId::new(7000),
1704 PacketClass::Core,
1705 RepresentationProfile::StreamAsymmetric,
1706 0,
1707 end_payload,
1708 );
1709 node.queue_incoming(end_frame);
1710 node.tick();
1711
1712 let field = node.state_engine().field();
1713 assert!(!field.contains(livestream_state_id(stream_id)));
1714 assert!(!field.contains(stream_visual_state_id(stream_id)));
1715 assert!(node.stream_metadata(stream_id).is_none());
1716 }
1717
1718 #[test]
1719 fn test_feed_append_roundtrip() {
1720 let mut node = Node::new();
1721 let feed_state = feed_id(7);
1722
1723 let item = MspFeedItem::new(
1725 elara_core::MessageId(1),
1726 NodeId::new(123),
1727 b"hello feed".to_vec(),
1728 elara_core::StateTime::from_millis(0),
1729 );
1730 let encoded_item = item.encode();
1731
1732 let payload = build_payload(
1733 EventType::FeedAppend,
1734 feed_state,
1735 MutationOp::Append(encoded_item),
1736 );
1737 let frame = incoming_frame_for(
1738 SessionId::new(1),
1739 NodeId::new(5000),
1740 PacketClass::Core,
1741 RepresentationProfile::Textual,
1742 0,
1743 payload,
1744 );
1745
1746 node.queue_incoming(frame);
1747 node.tick();
1748
1749 let stream = node.feed_stream(feed_state);
1750 assert_eq!(stream.items.len(), 1);
1751 let first = &stream.items[0];
1752 assert_eq!(first.id.0, 1);
1753 assert_eq!(first.author, NodeId::new(123));
1754 assert_eq!(first.content, b"hello feed".to_vec());
1755 assert!(!first.deleted);
1756 }
1757}