1use serde::{Deserialize, Serialize};
4use std::collections::HashMap;
5use std::time::{Duration, Instant, SystemTime};
6use thiserror::Error;
7use tracing::{debug, error, warn};
8use uuid::Uuid;
9
10use crate::message::{HandshakeType, Message, MessageType, ProtocolVersion};
11
12#[derive(Debug, Error)]
14pub enum StateError {
15 #[error("Invalid state transition from {from:?} to {to:?}")]
17 InvalidTransition {
18 from: ProtocolState,
19 to: ProtocolState,
20 },
21
22 #[error("State synchronization failed: {reason}")]
24 SyncFailed { reason: String },
25
26 #[error("Invalid state data: {reason}")]
28 InvalidData { reason: String },
29
30 #[error("State operation timed out after {timeout:?}")]
32 Timeout { timeout: Duration },
33
34 #[error("Session not found: {session_id}")]
36 SessionNotFound { session_id: Uuid },
37
38 #[error("Protocol version mismatch: expected {expected:?}, got {actual:?}")]
40 VersionMismatch {
41 expected: ProtocolVersion,
42 actual: ProtocolVersion,
43 },
44}
45
46#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, Default)]
48pub enum ProtocolState {
49 #[default]
51 Initial,
52
53 Handshake(HandshakeState),
55
56 Active(ActiveState),
58
59 Synchronizing(SyncState),
61
62 Error(ErrorState),
64
65 Shutdown,
67}
68
69#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
71pub enum HandshakeState {
72 Waiting,
74 InProgress,
76 Processing,
78 Completed,
80 Failed,
82}
83
84#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
86pub enum ActiveState {
87 Normal,
89 HighLoad,
91 Degraded,
93}
94
95#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
97pub enum SyncState {
98 Requesting,
100 Receiving,
102 Applying,
104 Verifying,
106}
107
108#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
110pub enum ErrorState {
111 NetworkError,
113 ConsensusError,
115 CryptoError,
117 ResourceError,
119 InternalError,
121}
122
123#[derive(Debug, Clone, Serialize, Deserialize)]
125pub struct SessionInfo {
126 pub id: Uuid,
128 pub peer_id: Vec<u8>,
130 pub protocol_version: ProtocolVersion,
132 pub state: ProtocolState,
134 pub started_at: SystemTime,
136 pub last_activity: SystemTime,
138 pub capabilities: Vec<String>,
140 pub metrics: SessionMetrics,
142}
143
144#[derive(Debug, Clone, Default, Serialize, Deserialize)]
146pub struct SessionMetrics {
147 pub messages_sent: u64,
149 pub messages_received: u64,
151 pub bytes_sent: u64,
153 pub bytes_received: u64,
155 pub avg_response_time: Duration,
157 pub error_count: u64,
159}
160
161#[derive(Debug)]
163pub struct ProtocolStateMachine {
164 current_state: ProtocolState,
166 previous_state: Option<ProtocolState>,
168 state_history: Vec<StateTransition>,
170 sessions: HashMap<Uuid, SessionInfo>,
172 started_at: Instant,
174 protocol_version: ProtocolVersion,
176 config: StateMachineConfig,
178}
179
180#[derive(Debug, Clone)]
182pub struct StateTransition {
183 pub timestamp: Instant,
185 pub from: ProtocolState,
187 pub to: ProtocolState,
189 pub reason: String,
191 pub duration: Duration,
193}
194
195#[derive(Debug, Clone)]
197pub struct StateMachineConfig {
198 pub max_sessions: usize,
200 pub session_timeout: Duration,
202 pub handshake_timeout: Duration,
204 pub sync_timeout: Duration,
206 pub max_history_size: usize,
208}
209
210impl Default for StateMachineConfig {
211 fn default() -> Self {
212 Self {
213 max_sessions: 1000,
214 session_timeout: Duration::from_secs(300), handshake_timeout: Duration::from_secs(30), sync_timeout: Duration::from_secs(60), max_history_size: 1000,
218 }
219 }
220}
221
222impl ProtocolStateMachine {
223 pub fn new(protocol_version: ProtocolVersion) -> Self {
225 Self {
226 current_state: ProtocolState::Initial,
227 previous_state: None,
228 state_history: Vec::new(),
229 sessions: HashMap::new(),
230 started_at: Instant::now(),
231 protocol_version,
232 config: StateMachineConfig::default(),
233 }
234 }
235
236 pub fn with_config(protocol_version: ProtocolVersion, config: StateMachineConfig) -> Self {
238 Self {
239 current_state: ProtocolState::Initial,
240 previous_state: None,
241 state_history: Vec::new(),
242 sessions: HashMap::new(),
243 started_at: Instant::now(),
244 protocol_version,
245 config,
246 }
247 }
248
249 pub fn current_state(&self) -> &ProtocolState {
251 &self.current_state
252 }
253
254 pub fn active_sessions(&self) -> usize {
256 self.sessions.len()
257 }
258
259 pub fn protocol_version(&self) -> &ProtocolVersion {
261 &self.protocol_version
262 }
263
264 pub fn uptime(&self) -> Duration {
266 self.started_at.elapsed()
267 }
268
269 pub fn transition_to(
271 &mut self,
272 new_state: ProtocolState,
273 reason: String,
274 ) -> Result<(), StateError> {
275 if !self.is_valid_transition(&self.current_state, &new_state) {
277 return Err(StateError::InvalidTransition {
278 from: self.current_state.clone(),
279 to: new_state,
280 });
281 }
282
283 let now = Instant::now();
284 let duration = if let Some(last_transition) = self.state_history.last() {
285 now.duration_since(last_transition.timestamp)
286 } else {
287 now.duration_since(self.started_at)
288 };
289
290 let transition = StateTransition {
292 timestamp: now,
293 from: self.current_state.clone(),
294 to: new_state.clone(),
295 reason: reason.clone(),
296 duration,
297 };
298
299 debug!(
300 "State transition: {:?} -> {:?} ({})",
301 self.current_state, new_state, reason
302 );
303
304 self.previous_state = Some(self.current_state.clone());
306 self.current_state = new_state;
307
308 self.state_history.push(transition);
310
311 if self.state_history.len() > self.config.max_history_size {
313 self.state_history.remove(0);
314 }
315
316 self.on_state_entry(&reason)?;
318
319 Ok(())
320 }
321
322 fn is_valid_transition(&self, from: &ProtocolState, to: &ProtocolState) -> bool {
324 use ActiveState::*;
325 use ErrorState::*;
326 use HandshakeState::*;
327 use ProtocolState::*;
328 use SyncState::*;
329
330 match (from, to) {
331 (Initial, Handshake(Waiting)) => true,
333 (Initial, Error(_)) => true,
334 (Initial, Shutdown) => true,
335
336 (Handshake(Waiting), Handshake(InProgress)) => true,
338 (Handshake(InProgress), Handshake(Processing)) => true,
339 (Handshake(InProgress), Handshake(Failed)) => true,
340 (Handshake(Processing), Handshake(Completed)) => true,
341 (Handshake(Processing), Handshake(Failed)) => true,
342 (Handshake(Completed), Active(Normal)) => true,
343 (Handshake(Failed), Error(NetworkError)) => true,
344 (Handshake(_), Shutdown) => true,
345
346 (Active(Normal), Active(HighLoad)) => true,
348 (Active(Normal), Active(Degraded)) => true,
349 (Active(Normal), Synchronizing(Requesting)) => true,
350 (Active(HighLoad), Active(Normal)) => true,
351 (Active(HighLoad), Active(Degraded)) => true,
352 (Active(Degraded), Active(Normal)) => true,
353 (Active(Degraded), Synchronizing(Requesting)) => true,
354 (Active(_), Error(_)) => true,
355 (Active(_), Shutdown) => true,
356
357 (Synchronizing(Requesting), Synchronizing(Receiving)) => true,
359 (Synchronizing(Requesting), Error(NetworkError)) => true,
360 (Synchronizing(Receiving), Synchronizing(Applying)) => true,
361 (Synchronizing(Receiving), Error(NetworkError)) => true,
362 (Synchronizing(Applying), Synchronizing(Verifying)) => true,
363 (Synchronizing(Applying), Error(InternalError)) => true,
364 (Synchronizing(Verifying), Active(Normal)) => true,
365 (Synchronizing(Verifying), Error(InternalError)) => true,
366 (Synchronizing(_), Shutdown) => true,
367
368 (Error(_), Initial) => true, (Error(_), Shutdown) => true,
371
372 (Shutdown, _) => false, (a, b) if a == b => true,
377
378 _ => false,
379 }
380 }
381
382 fn on_state_entry(&mut self, reason: &str) -> Result<(), StateError> {
384 let current_state = self.current_state.clone();
385 match ¤t_state {
386 ProtocolState::Initial => {
387 debug!("Entered Initial state: {}", reason);
388 self.sessions.clear();
390 }
391
392 ProtocolState::Handshake(handshake_state) => {
393 debug!("Entered Handshake state {:?}: {}", handshake_state, reason);
394 match handshake_state {
395 HandshakeState::InProgress => {
396 }
399 HandshakeState::Failed => {
400 warn!("Handshake failed: {}", reason);
401 self.cleanup_failed_sessions();
403 }
404 _ => {}
405 }
406 }
407
408 ProtocolState::Active(active_state) => {
409 debug!("Entered Active state {:?}: {}", active_state, reason);
410 match active_state {
411 ActiveState::HighLoad => {
412 warn!("Entering high load state, implementing load shedding");
414 }
415 ActiveState::Degraded => {
416 warn!("Entering degraded state: {}", reason);
417 }
418 _ => {}
419 }
420 }
421
422 ProtocolState::Synchronizing(sync_state) => {
423 debug!("Entered Synchronizing state {:?}: {}", sync_state, reason);
424 }
425
426 ProtocolState::Error(error_state) => {
427 error!("Entered Error state {:?}: {}", error_state, reason);
428 self.handle_error_state(error_state, reason)?;
430 }
431
432 ProtocolState::Shutdown => {
433 debug!("Entered Shutdown state: {}", reason);
434 self.begin_shutdown();
436 }
437 }
438
439 Ok(())
440 }
441
442 fn handle_error_state(
444 &mut self,
445 error_state: &ErrorState,
446 reason: &str,
447 ) -> Result<(), StateError> {
448 match error_state {
449 ErrorState::NetworkError => {
450 self.cleanup_failed_sessions();
452 }
453 ErrorState::ConsensusError => {
454 }
457 ErrorState::CryptoError => {
458 error!("Critical cryptographic error: {}", reason);
460 }
461 ErrorState::ResourceError => {
462 self.cleanup_resources();
464 }
465 ErrorState::InternalError => {
466 error!("Internal protocol error: {}", reason);
468 }
469 }
470 Ok(())
471 }
472
473 fn begin_shutdown(&mut self) {
475 debug!("Beginning graceful shutdown");
476
477 for (session_id, session) in &mut self.sessions {
479 debug!("Closing session: {}", session_id);
480 session.state = ProtocolState::Shutdown;
481 }
482
483 }
486
487 fn cleanup_failed_sessions(&mut self) {
489 let failed_sessions: Vec<Uuid> = self
490 .sessions
491 .iter()
492 .filter(|(_, session)| {
493 matches!(
494 session.state,
495 ProtocolState::Error(_) | ProtocolState::Handshake(HandshakeState::Failed)
496 )
497 })
498 .map(|(id, _)| *id)
499 .collect();
500
501 for session_id in failed_sessions {
502 debug!("Cleaning up failed session: {}", session_id);
503 self.sessions.remove(&session_id);
504 }
505 }
506
507 fn cleanup_resources(&mut self) {
509 debug!("Cleaning up resources");
510
511 if self.state_history.len() > self.config.max_history_size / 2 {
513 let keep_from = self.state_history.len() - self.config.max_history_size / 2;
514 self.state_history.drain(0..keep_from);
515 }
516
517 self.cleanup_timed_out_sessions();
519 }
520
521 fn cleanup_timed_out_sessions(&mut self) {
523 let now = SystemTime::now();
524 let timeout = self.config.session_timeout;
525
526 let timed_out_sessions: Vec<Uuid> = self
527 .sessions
528 .iter()
529 .filter(|(_, session)| {
530 now.duration_since(session.last_activity)
531 .unwrap_or(Duration::ZERO)
532 > timeout
533 })
534 .map(|(id, _)| *id)
535 .collect();
536
537 for session_id in timed_out_sessions {
538 debug!("Removing timed out session: {}", session_id);
539 self.sessions.remove(&session_id);
540 }
541 }
542
543 pub fn create_session(
545 &mut self,
546 peer_id: Vec<u8>,
547 protocol_version: ProtocolVersion,
548 capabilities: Vec<String>,
549 ) -> Result<Uuid, StateError> {
550 if self.sessions.len() >= self.config.max_sessions {
552 return Err(StateError::InvalidData {
553 reason: "Maximum number of sessions reached".to_string(),
554 });
555 }
556
557 if !self.protocol_version.is_compatible(&protocol_version) {
559 return Err(StateError::VersionMismatch {
560 expected: self.protocol_version.clone(),
561 actual: protocol_version,
562 });
563 }
564
565 let session_id = Uuid::new_v4();
566 let now = SystemTime::now();
567
568 let session = SessionInfo {
569 id: session_id,
570 peer_id,
571 protocol_version,
572 state: ProtocolState::Handshake(HandshakeState::Waiting),
573 started_at: now,
574 last_activity: now,
575 capabilities,
576 metrics: SessionMetrics::default(),
577 };
578
579 self.sessions.insert(session_id, session);
580
581 debug!("Created new session: {}", session_id);
582 Ok(session_id)
583 }
584
585 pub fn update_session_state(
587 &mut self,
588 session_id: Uuid,
589 new_state: ProtocolState,
590 ) -> Result<(), StateError> {
591 let current_session_state = self
593 .sessions
594 .get(&session_id)
595 .ok_or(StateError::SessionNotFound { session_id })?
596 .state
597 .clone();
598
599 if !self.is_valid_transition(¤t_session_state, &new_state) {
601 return Err(StateError::InvalidTransition {
602 from: current_session_state,
603 to: new_state,
604 });
605 }
606
607 let session = self
609 .sessions
610 .get_mut(&session_id)
611 .ok_or(StateError::SessionNotFound { session_id })?;
612 session.state = new_state;
613 session.last_activity = SystemTime::now();
614
615 Ok(())
616 }
617
618 pub fn get_session(&self, session_id: &Uuid) -> Option<&SessionInfo> {
620 self.sessions.get(session_id)
621 }
622
623 pub fn remove_session(&mut self, session_id: &Uuid) -> Option<SessionInfo> {
625 self.sessions.remove(session_id)
626 }
627
628 pub fn process_message(
630 &mut self,
631 message: &Message,
632 session_id: Option<Uuid>,
633 ) -> Result<(), StateError> {
634 if let Some(session_id) = session_id {
636 if let Some(session) = self.sessions.get_mut(&session_id) {
637 session.last_activity = SystemTime::now();
638 session.metrics.messages_received += 1;
639 session.metrics.bytes_received += message.payload.len() as u64;
640 }
641 }
642
643 match &message.msg_type {
645 MessageType::Handshake(handshake_type) => {
646 self.process_handshake_message(handshake_type, message, session_id)?;
647 }
648 MessageType::Control(_) => {
649 if !matches!(self.current_state, ProtocolState::Shutdown) {
651 debug!(
653 "Processing control message in state {:?}",
654 self.current_state
655 );
656 }
657 }
658 _ => {
659 match &self.current_state {
661 ProtocolState::Active(_) => {
662 debug!("Processing message in active state");
664 }
665 _ => {
666 warn!(
667 "Received message in non-active state: {:?}",
668 self.current_state
669 );
670 }
671 }
672 }
673 }
674
675 Ok(())
676 }
677
678 fn process_handshake_message(
680 &mut self,
681 handshake_type: &HandshakeType,
682 _message: &Message,
683 session_id: Option<Uuid>,
684 ) -> Result<(), StateError> {
685 match handshake_type {
686 HandshakeType::Init => {
687 if matches!(
688 self.current_state,
689 ProtocolState::Initial | ProtocolState::Handshake(_)
690 ) {
691 self.transition_to(
692 ProtocolState::Handshake(HandshakeState::InProgress),
693 "Received handshake init".to_string(),
694 )?;
695 }
696 }
697 HandshakeType::Response => {
698 if matches!(
699 self.current_state,
700 ProtocolState::Handshake(HandshakeState::InProgress)
701 ) {
702 self.transition_to(
703 ProtocolState::Handshake(HandshakeState::Processing),
704 "Received handshake response".to_string(),
705 )?;
706 }
707 }
708 HandshakeType::Complete => {
709 if matches!(
710 self.current_state,
711 ProtocolState::Handshake(HandshakeState::Processing)
712 ) {
713 self.transition_to(
714 ProtocolState::Handshake(HandshakeState::Completed),
715 "Handshake completed".to_string(),
716 )?;
717
718 self.transition_to(
720 ProtocolState::Active(ActiveState::Normal),
721 "Handshake successful, entering active state".to_string(),
722 )?;
723 }
724 }
725 HandshakeType::VersionNegotiation => {
726 debug!("Processing version negotiation");
728 }
729 }
730
731 if let Some(session_id) = session_id {
733 if let Some(session) = self.sessions.get_mut(&session_id) {
734 session.state = self.current_state.clone();
735 }
736 }
737
738 Ok(())
739 }
740
741 pub fn get_state_history(&self) -> &[StateTransition] {
743 &self.state_history
744 }
745
746 pub fn get_sessions(&self) -> &HashMap<Uuid, SessionInfo> {
748 &self.sessions
749 }
750
751 pub fn is_healthy(&self) -> bool {
753 !matches!(
754 self.current_state,
755 ProtocolState::Error(_) | ProtocolState::Shutdown
756 )
757 }
758
759 pub fn get_metrics(&self) -> StateMachineMetrics {
761 let mut total_messages_sent = 0;
762 let mut total_messages_received = 0;
763 let mut total_bytes_sent = 0;
764 let mut total_bytes_received = 0;
765 let mut total_errors = 0;
766
767 for session in self.sessions.values() {
768 total_messages_sent += session.metrics.messages_sent;
769 total_messages_received += session.metrics.messages_received;
770 total_bytes_sent += session.metrics.bytes_sent;
771 total_bytes_received += session.metrics.bytes_received;
772 total_errors += session.metrics.error_count;
773 }
774
775 StateMachineMetrics {
776 current_state: self.current_state.clone(),
777 uptime: self.uptime(),
778 active_sessions: self.sessions.len(),
779 total_state_transitions: self.state_history.len(),
780 total_messages_sent,
781 total_messages_received,
782 total_bytes_sent,
783 total_bytes_received,
784 total_errors,
785 }
786 }
787}
788
789#[derive(Debug, Clone, Serialize, Deserialize, Default)]
791pub struct StateMachineMetrics {
792 pub current_state: ProtocolState,
794 pub uptime: Duration,
796 pub active_sessions: usize,
798 pub total_state_transitions: usize,
800 pub total_messages_sent: u64,
802 pub total_messages_received: u64,
804 pub total_bytes_sent: u64,
806 pub total_bytes_received: u64,
808 pub total_errors: u64,
810}
811
812pub trait StateManager {
814 fn init() -> Result<ProtocolStateMachine, StateError>;
816
817 fn transition(&mut self, new_state: ProtocolState) -> Result<(), StateError>;
819
820 fn get_state(&self) -> &ProtocolState;
822
823 fn validate_transition(&self, new_state: &ProtocolState) -> bool;
825}
826
827impl StateManager for ProtocolStateMachine {
828 fn init() -> Result<ProtocolStateMachine, StateError> {
829 Ok(ProtocolStateMachine::new(ProtocolVersion::CURRENT))
830 }
831
832 fn transition(&mut self, new_state: ProtocolState) -> Result<(), StateError> {
833 self.transition_to(new_state, "Manual transition".to_string())
834 }
835
836 fn get_state(&self) -> &ProtocolState {
837 &self.current_state
838 }
839
840 fn validate_transition(&self, new_state: &ProtocolState) -> bool {
841 self.is_valid_transition(&self.current_state, new_state)
842 }
843}