alopex_chirps_file_transfer/
session.rs1use crate::TransferSessionId;
2use crate::chunk::ChunkTracker;
3use crate::error::FileTransferError;
4use crate::manifest::TransferManifest;
5use crate::options::{TransferMode, TransferOptions};
6use alopex_chirps_wire::node_id::NodeId;
7use serde::{Deserialize, Serialize};
8use std::path::PathBuf;
9use std::time::SystemTime;
10use tokio::sync::watch;
11
12#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
14pub enum TransferKind {
15 Send,
17 Broadcast,
19 Sync,
21}
22
23#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
25pub enum TransferState {
26 Initializing,
28 InProgress,
30 Paused,
32 Verifying,
34 Completed,
36 Failed,
38 Cancelled,
40}
41
42#[derive(Debug, Clone, Serialize, Deserialize)]
44pub struct TransferSession {
45 pub id: TransferSessionId,
46 pub kind: TransferKind,
47 pub mode: TransferMode,
48 pub source_node: NodeId,
49 pub target_nodes: Vec<NodeId>,
50 pub source_path: PathBuf,
51 pub dest_path: PathBuf,
52 pub state: TransferState,
53 pub manifest: TransferManifest,
54 pub chunk_tracker: ChunkTracker,
55 pub options: TransferOptions,
56 pub created_at: SystemTime,
57 pub updated_at: SystemTime,
58 pub error: Option<String>,
59 #[serde(skip)]
60 pub control: TransferControl,
61}
62
63#[derive(Debug, Clone, Copy, PartialEq, Eq)]
65pub enum TransferControlState {
66 Running,
68 Paused,
70 Cancelled,
72}
73
74#[derive(Debug, Clone)]
76pub struct TransferControl {
77 state: watch::Sender<TransferControlState>,
78}
79
80impl Default for TransferControl {
81 fn default() -> Self {
82 let (state, _) = watch::channel(TransferControlState::Running);
83 TransferControl { state }
84 }
85}
86
87impl TransferControl {
88 pub fn subscribe(&self) -> watch::Receiver<TransferControlState> {
93 self.state.subscribe()
94 }
95
96 pub fn state(&self) -> TransferControlState {
101 *self.state.borrow()
102 }
103
104 pub fn set_state(&self, state: TransferControlState) {
109 let _ = self.state.send(state);
110 }
111}
112
113#[derive(Debug, Clone, Serialize, Deserialize)]
115pub struct TransferSessionInfo {
116 pub id: TransferSessionId,
117 pub kind: TransferKind,
118 pub mode: TransferMode,
119 pub source_node: NodeId,
120 pub target_nodes: Vec<NodeId>,
121 pub source_path: PathBuf,
122 pub dest_path: PathBuf,
123 pub state: TransferState,
124 pub created_at: SystemTime,
125 pub updated_at: SystemTime,
126 pub error: Option<String>,
127}
128
129impl TransferSession {
130 #[allow(clippy::too_many_arguments)]
131 pub fn new(
136 id: TransferSessionId,
137 kind: TransferKind,
138 mode: TransferMode,
139 source_node: NodeId,
140 target_nodes: Vec<NodeId>,
141 source_path: PathBuf,
142 dest_path: PathBuf,
143 manifest: TransferManifest,
144 chunk_tracker: ChunkTracker,
145 options: TransferOptions,
146 ) -> Self {
147 let now = SystemTime::now();
148 TransferSession {
149 id,
150 kind,
151 mode,
152 source_node,
153 target_nodes,
154 source_path,
155 dest_path,
156 state: TransferState::Initializing,
157 manifest,
158 chunk_tracker,
159 options,
160 created_at: now,
161 updated_at: now,
162 error: None,
163 control: TransferControl::default(),
164 }
165 }
166
167 pub fn transition_to(&mut self, next: TransferState) -> Result<(), FileTransferError> {
175 if !Self::can_transition(self.state, next) {
176 return Err(FileTransferError::InvalidState {
177 expected: format!("{:?}", self.state),
178 actual: format!("{:?}", next),
179 });
180 }
181 self.state = next;
182 self.updated_at = SystemTime::now();
183 if !matches!(next, TransferState::Failed) {
184 self.error = None;
185 }
186 Ok(())
187 }
188
189 pub fn fail(&mut self, message: impl Into<String>) {
194 self.state = TransferState::Failed;
195 self.error = Some(message.into());
196 self.updated_at = SystemTime::now();
197 }
198
199 fn can_transition(current: TransferState, next: TransferState) -> bool {
200 match current {
201 TransferState::Initializing => matches!(
202 next,
203 TransferState::InProgress | TransferState::Failed | TransferState::Cancelled
204 ),
205 TransferState::InProgress => matches!(
206 next,
207 TransferState::Paused
208 | TransferState::Verifying
209 | TransferState::Completed
210 | TransferState::Failed
211 | TransferState::Cancelled
212 ),
213 TransferState::Paused => matches!(
214 next,
215 TransferState::InProgress | TransferState::Failed | TransferState::Cancelled
216 ),
217 TransferState::Verifying => {
218 matches!(next, TransferState::Completed | TransferState::Failed)
219 }
220 TransferState::Completed | TransferState::Failed | TransferState::Cancelled => false,
221 }
222 }
223}
224
225impl From<&TransferSession> for TransferSessionInfo {
226 fn from(session: &TransferSession) -> Self {
227 TransferSessionInfo {
228 id: session.id,
229 kind: session.kind,
230 mode: session.mode,
231 source_node: session.source_node,
232 target_nodes: session.target_nodes.clone(),
233 source_path: session.source_path.clone(),
234 dest_path: session.dest_path.clone(),
235 state: session.state,
236 created_at: session.created_at,
237 updated_at: session.updated_at,
238 error: session.error.clone(),
239 }
240 }
241}