alopex_chirps_file_transfer/
session.rs

1use crate::TransferSessionId;
2use crate::chunk::ChunkTracker;
3use crate::error::FileTransferError;
4use crate::manifest::TransferManifest;
5use crate::options::{TransferMode, TransferOptions};
6use alopex_chirps_wire::node_id::NodeId;
7use serde::{Deserialize, Serialize};
8use std::path::PathBuf;
9use std::time::SystemTime;
10use tokio::sync::watch;
11
12/// Kind of transfer session.
13#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
14pub enum TransferKind {
15    /// Single target send.
16    Send,
17    /// Broadcast to multiple targets.
18    Broadcast,
19    /// Sync operation.
20    Sync,
21}
22
23/// State machine for a transfer session.
24#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
25pub enum TransferState {
26    /// Session is being initialized.
27    Initializing,
28    /// Transfer is in progress.
29    InProgress,
30    /// Transfer is paused.
31    Paused,
32    /// Transfer is verifying integrity.
33    Verifying,
34    /// Transfer completed successfully.
35    Completed,
36    /// Transfer failed.
37    Failed,
38    /// Transfer was cancelled.
39    Cancelled,
40}
41
42/// Persistent record of an in-progress or completed transfer.
43#[derive(Debug, Clone, Serialize, Deserialize)]
44pub struct TransferSession {
45    pub id: TransferSessionId,
46    pub kind: TransferKind,
47    pub mode: TransferMode,
48    pub source_node: NodeId,
49    pub target_nodes: Vec<NodeId>,
50    pub source_path: PathBuf,
51    pub dest_path: PathBuf,
52    pub state: TransferState,
53    pub manifest: TransferManifest,
54    pub chunk_tracker: ChunkTracker,
55    pub options: TransferOptions,
56    pub created_at: SystemTime,
57    pub updated_at: SystemTime,
58    pub error: Option<String>,
59    #[serde(skip)]
60    pub control: TransferControl,
61}
62
63/// Control state exposed to transfer loops.
64#[derive(Debug, Clone, Copy, PartialEq, Eq)]
65pub enum TransferControlState {
66    /// Transfer should continue.
67    Running,
68    /// Transfer should wait without progressing.
69    Paused,
70    /// Transfer should abort.
71    Cancelled,
72}
73
74/// Control channel for pausing or cancelling a transfer.
75#[derive(Debug, Clone)]
76pub struct TransferControl {
77    state: watch::Sender<TransferControlState>,
78}
79
80impl Default for TransferControl {
81    fn default() -> Self {
82        let (state, _) = watch::channel(TransferControlState::Running);
83        TransferControl { state }
84    }
85}
86
87impl TransferControl {
88    /// Subscribes to control state changes.
89    ///
90    /// # Panics
91    /// This method does not panic.
92    pub fn subscribe(&self) -> watch::Receiver<TransferControlState> {
93        self.state.subscribe()
94    }
95
96    /// Returns the current control state.
97    ///
98    /// # Panics
99    /// This method does not panic.
100    pub fn state(&self) -> TransferControlState {
101        *self.state.borrow()
102    }
103
104    /// Updates the control state.
105    ///
106    /// # Panics
107    /// This method does not panic.
108    pub fn set_state(&self, state: TransferControlState) {
109        let _ = self.state.send(state);
110    }
111}
112
113/// Public snapshot of a transfer session.
114#[derive(Debug, Clone, Serialize, Deserialize)]
115pub struct TransferSessionInfo {
116    pub id: TransferSessionId,
117    pub kind: TransferKind,
118    pub mode: TransferMode,
119    pub source_node: NodeId,
120    pub target_nodes: Vec<NodeId>,
121    pub source_path: PathBuf,
122    pub dest_path: PathBuf,
123    pub state: TransferState,
124    pub created_at: SystemTime,
125    pub updated_at: SystemTime,
126    pub error: Option<String>,
127}
128
129impl TransferSession {
130    #[allow(clippy::too_many_arguments)]
131    /// Creates a new transfer session in the initializing state.
132    ///
133    /// # Panics
134    /// This method does not panic.
135    pub fn new(
136        id: TransferSessionId,
137        kind: TransferKind,
138        mode: TransferMode,
139        source_node: NodeId,
140        target_nodes: Vec<NodeId>,
141        source_path: PathBuf,
142        dest_path: PathBuf,
143        manifest: TransferManifest,
144        chunk_tracker: ChunkTracker,
145        options: TransferOptions,
146    ) -> Self {
147        let now = SystemTime::now();
148        TransferSession {
149            id,
150            kind,
151            mode,
152            source_node,
153            target_nodes,
154            source_path,
155            dest_path,
156            state: TransferState::Initializing,
157            manifest,
158            chunk_tracker,
159            options,
160            created_at: now,
161            updated_at: now,
162            error: None,
163            control: TransferControl::default(),
164        }
165    }
166
167    /// Transitions the session to a new state if allowed.
168    ///
169    /// # Errors
170    /// Returns `FileTransferError::InvalidState` if the transition is not allowed.
171    ///
172    /// # Panics
173    /// This method does not panic.
174    pub fn transition_to(&mut self, next: TransferState) -> Result<(), FileTransferError> {
175        if !Self::can_transition(self.state, next) {
176            return Err(FileTransferError::InvalidState {
177                expected: format!("{:?}", self.state),
178                actual: format!("{:?}", next),
179            });
180        }
181        self.state = next;
182        self.updated_at = SystemTime::now();
183        if !matches!(next, TransferState::Failed) {
184            self.error = None;
185        }
186        Ok(())
187    }
188
189    /// Marks the session as failed with an error message.
190    ///
191    /// # Panics
192    /// This method does not panic.
193    pub fn fail(&mut self, message: impl Into<String>) {
194        self.state = TransferState::Failed;
195        self.error = Some(message.into());
196        self.updated_at = SystemTime::now();
197    }
198
199    fn can_transition(current: TransferState, next: TransferState) -> bool {
200        match current {
201            TransferState::Initializing => matches!(
202                next,
203                TransferState::InProgress | TransferState::Failed | TransferState::Cancelled
204            ),
205            TransferState::InProgress => matches!(
206                next,
207                TransferState::Paused
208                    | TransferState::Verifying
209                    | TransferState::Completed
210                    | TransferState::Failed
211                    | TransferState::Cancelled
212            ),
213            TransferState::Paused => matches!(
214                next,
215                TransferState::InProgress | TransferState::Failed | TransferState::Cancelled
216            ),
217            TransferState::Verifying => {
218                matches!(next, TransferState::Completed | TransferState::Failed)
219            }
220            TransferState::Completed | TransferState::Failed | TransferState::Cancelled => false,
221        }
222    }
223}
224
225impl From<&TransferSession> for TransferSessionInfo {
226    fn from(session: &TransferSession) -> Self {
227        TransferSessionInfo {
228            id: session.id,
229            kind: session.kind,
230            mode: session.mode,
231            source_node: session.source_node,
232            target_nodes: session.target_nodes.clone(),
233            source_path: session.source_path.clone(),
234            dest_path: session.dest_path.clone(),
235            state: session.state,
236            created_at: session.created_at,
237            updated_at: session.updated_at,
238            error: session.error.clone(),
239        }
240    }
241}