1use clasp_core::{
8 codec, FederationOp, FederationSyncMessage, HelloMessage, Message, QoS, SetMessage,
9 SubscribeMessage, Value, PROTOCOL_VERSION,
10};
11use clasp_transport::{TransportEvent, TransportReceiver, TransportSender};
12use std::collections::HashMap;
13use std::sync::Arc;
14use tokio::sync::mpsc;
15use tracing::{debug, error, info, warn};
16
17use crate::config::{FederationConfig, PeerInfo, PeerState};
18use crate::error::{FederationError, Result};
19
20#[derive(Debug)]
22pub enum LinkEvent {
23 PeerNamespaces {
25 router_id: String,
26 patterns: Vec<String>,
27 },
28 RemoteSet {
30 address: String,
31 value: Value,
32 revision: Option<u64>,
33 origin: String,
34 },
35 RemotePublish { message: Message, origin: String },
37 SyncComplete {
39 router_id: String,
40 pattern: String,
41 revision: u64,
42 },
43 Disconnected {
45 router_id: String,
46 reason: Option<String>,
47 },
48 Connected { router_id: String },
50}
51
52pub struct FederationLink {
58 config: FederationConfig,
60 sender: Arc<dyn TransportSender>,
62 peer: Option<PeerInfo>,
64 state: PeerState,
66 event_tx: mpsc::Sender<LinkEvent>,
68 revision_vector: HashMap<String, u64>,
70}
71
72impl FederationLink {
73 pub fn new(
77 config: FederationConfig,
78 sender: Arc<dyn TransportSender>,
79 event_tx: mpsc::Sender<LinkEvent>,
80 ) -> Self {
81 Self {
82 config,
83 sender,
84 peer: None,
85 state: PeerState::Connecting,
86 event_tx,
87 revision_vector: HashMap::new(),
88 }
89 }
90
91 pub async fn run(mut self, mut receiver: Box<dyn TransportReceiver>) -> Result<()> {
96 self.send_hello().await?;
98 self.state = PeerState::Handshaking;
99
100 loop {
102 match receiver.recv().await {
103 Some(TransportEvent::Data(data)) => {
104 if let Err(e) = self.handle_data(&data).await {
105 error!("Federation link error: {}", e);
106 break;
107 }
108 }
109 Some(TransportEvent::Disconnected { reason }) => {
110 info!(
111 "Federation peer disconnected: {:?}",
112 reason.as_deref().unwrap_or("unknown")
113 );
114 let router_id = self
115 .peer
116 .as_ref()
117 .map(|p| p.router_id.clone())
118 .unwrap_or_default();
119 let _ = self
120 .event_tx
121 .send(LinkEvent::Disconnected { router_id, reason })
122 .await;
123 break;
124 }
125 Some(TransportEvent::Error(e)) => {
126 error!("Federation transport error: {}", e);
127 break;
128 }
129 Some(TransportEvent::Connected) => {
130 debug!("Federation transport connected event");
131 }
132 None => {
133 debug!("Federation transport stream ended");
134 break;
135 }
136 }
137 }
138
139 self.state = PeerState::Disconnected;
140 Ok(())
141 }
142
143 async fn send_hello(&self) -> Result<()> {
145 let hello = Message::Hello(HelloMessage {
146 version: PROTOCOL_VERSION,
147 name: self.config.client_name.clone(),
148 features: self.config.features.clone(),
149 capabilities: None,
150 token: self.config.auth_token.clone(),
151 });
152
153 self.send_message(&hello, QoS::Confirm).await
154 }
155
156 async fn declare_namespaces(&self) -> Result<()> {
158 let msg = Message::FederationSync(FederationSyncMessage {
159 op: FederationOp::DeclareNamespaces,
160 patterns: self.config.owned_namespaces.clone(),
161 revisions: HashMap::new(),
162 since_revision: None,
163 origin: Some(self.config.router_id.clone()),
164 });
165
166 self.send_message(&msg, QoS::Confirm).await
167 }
168
169 async fn subscribe_to_peer(&self, patterns: &[String]) -> Result<()> {
171 for (i, pattern) in patterns.iter().enumerate() {
172 let sub = Message::Subscribe(SubscribeMessage {
173 id: (1000 + i) as u32, pattern: pattern.clone(),
175 types: vec![],
176 options: None,
177 });
178 self.send_message(&sub, QoS::Confirm).await?;
179 }
180 Ok(())
181 }
182
183 async fn request_sync(&self, pattern: &str, since: Option<u64>) -> Result<()> {
185 let msg = Message::FederationSync(FederationSyncMessage {
186 op: FederationOp::RequestSync,
187 patterns: vec![pattern.to_string()],
188 revisions: HashMap::new(),
189 since_revision: since,
190 origin: Some(self.config.router_id.clone()),
191 });
192
193 self.send_message(&msg, QoS::Confirm).await
194 }
195
196 async fn send_revision_vector(&self) -> Result<()> {
198 let msg = Message::FederationSync(FederationSyncMessage {
199 op: FederationOp::RevisionVector,
200 patterns: vec![],
201 revisions: self.revision_vector.clone(),
202 since_revision: None,
203 origin: Some(self.config.router_id.clone()),
204 });
205
206 self.send_message(&msg, QoS::Confirm).await
207 }
208
209 pub async fn forward_set(&self, msg: &SetMessage, origin: &str) -> Result<()> {
211 if let Some(ref peer) = self.peer {
213 if origin == peer.router_id {
214 return Ok(());
215 }
216 }
217
218 let set = Message::Set(SetMessage {
219 address: msg.address.clone(),
220 value: msg.value.clone(),
221 revision: msg.revision,
222 lock: false,
223 unlock: false,
224 ttl: None,
225 });
226
227 self.send_message(&set, QoS::Confirm).await
228 }
229
230 pub async fn forward_publish(&self, msg: &Message, origin: &str) -> Result<()> {
232 if let Some(ref peer) = self.peer {
234 if origin == peer.router_id {
235 return Ok(());
236 }
237 }
238
239 self.send_message(msg, QoS::Fire).await
240 }
241
242 pub fn peer(&self) -> Option<&PeerInfo> {
244 self.peer.as_ref()
245 }
246
247 pub fn state(&self) -> PeerState {
249 self.state
250 }
251
252 pub fn is_active(&self) -> bool {
254 self.state == PeerState::Active
255 }
256
257 async fn handle_data(&mut self, data: &[u8]) -> Result<()> {
263 let (msg, _frame) =
264 codec::decode(data).map_err(|e| FederationError::Codec(e.to_string()))?;
265
266 match msg {
267 Message::Welcome(welcome) => {
268 info!(
269 "Federation handshake: received WELCOME from '{}' (session: {})",
270 welcome.name, welcome.session
271 );
272
273 self.peer = Some(PeerInfo {
275 router_id: welcome.session.clone(),
276 session_id: Some(welcome.session),
277 namespaces: vec![],
278 endpoint: None,
279 outbound: true,
280 state: PeerState::Handshaking,
281 });
282
283 self.declare_namespaces().await?;
285 self.state = PeerState::Syncing;
286 }
287
288 Message::FederationSync(fed_msg) => {
289 self.handle_federation_sync(fed_msg).await?;
290 }
291
292 Message::Set(set_msg) => {
293 let origin = self
295 .peer
296 .as_ref()
297 .map(|p| p.router_id.clone())
298 .unwrap_or_default();
299
300 if let Some(rev) = set_msg.revision {
302 self.revision_vector.insert(set_msg.address.clone(), rev);
303 }
304
305 let _ = self
306 .event_tx
307 .send(LinkEvent::RemoteSet {
308 address: set_msg.address,
309 value: set_msg.value,
310 revision: set_msg.revision,
311 origin,
312 })
313 .await;
314 }
315
316 Message::Publish(_) => {
317 let origin = self
318 .peer
319 .as_ref()
320 .map(|p| p.router_id.clone())
321 .unwrap_or_default();
322
323 let _ = self
324 .event_tx
325 .send(LinkEvent::RemotePublish {
326 message: msg,
327 origin,
328 })
329 .await;
330 }
331
332 Message::Snapshot(snapshot) => {
333 let origin = self
335 .peer
336 .as_ref()
337 .map(|p| p.router_id.clone())
338 .unwrap_or_default();
339
340 for param in snapshot.params {
341 self.revision_vector
342 .insert(param.address.clone(), param.revision);
343 let _ = self
344 .event_tx
345 .send(LinkEvent::RemoteSet {
346 address: param.address,
347 value: param.value,
348 revision: Some(param.revision),
349 origin: origin.clone(),
350 })
351 .await;
352 }
353 }
354
355 Message::Ack(_) => {
356 }
358
359 Message::Error(err) => {
360 warn!(
361 "Federation peer error: {} (code: {})",
362 err.message, err.code
363 );
364 }
365
366 Message::Ping => {
367 self.send_message(&Message::Pong, QoS::Fire).await?;
369 }
370
371 _ => {
372 debug!(
373 "Federation link: ignoring message type {:?}",
374 msg.type_code()
375 );
376 }
377 }
378
379 Ok(())
380 }
381
382 async fn handle_federation_sync(&mut self, msg: FederationSyncMessage) -> Result<()> {
384 match msg.op {
385 FederationOp::DeclareNamespaces => {
386 let router_id = msg
387 .origin
388 .clone()
389 .or_else(|| self.peer.as_ref().map(|p| p.router_id.clone()))
390 .unwrap_or_default();
391
392 info!("Peer {} declares namespaces: {:?}", router_id, msg.patterns);
393
394 if let Some(ref mut peer) = self.peer {
396 peer.namespaces = msg.patterns.clone();
397 }
398
399 let _ = self
401 .event_tx
402 .send(LinkEvent::PeerNamespaces {
403 router_id: router_id.clone(),
404 patterns: msg.patterns.clone(),
405 })
406 .await;
407
408 self.subscribe_to_peer(&msg.patterns).await?;
410
411 for pattern in &msg.patterns {
413 self.request_sync(pattern, None).await?;
414 }
415 }
416
417 FederationOp::RequestSync => {
418 debug!("Peer requests sync for patterns: {:?}", msg.patterns);
419 self.send_revision_vector().await?;
422 }
423
424 FederationOp::RevisionVector => {
425 debug!(
426 "Received revision vector with {} entries",
427 msg.revisions.len()
428 );
429 }
432
433 FederationOp::SyncComplete => {
434 let router_id = self
435 .peer
436 .as_ref()
437 .map(|p| p.router_id.clone())
438 .unwrap_or_default();
439
440 let pattern = msg.patterns.first().cloned().unwrap_or_default();
441 let revision = msg.since_revision.unwrap_or(0);
442
443 info!(
444 "Sync complete for pattern '{}' at revision {}",
445 pattern, revision
446 );
447
448 if let Some(ref mut peer) = self.peer {
449 peer.state = PeerState::Active;
450 }
451 self.state = PeerState::Active;
452
453 let _ = self
454 .event_tx
455 .send(LinkEvent::SyncComplete {
456 router_id,
457 pattern,
458 revision,
459 })
460 .await;
461
462 let _ = self
463 .event_tx
464 .send(LinkEvent::Connected {
465 router_id: self
466 .peer
467 .as_ref()
468 .map(|p| p.router_id.clone())
469 .unwrap_or_default(),
470 })
471 .await;
472 }
473 }
474
475 Ok(())
476 }
477
478 async fn send_message(&self, msg: &Message, _qos: QoS) -> Result<()> {
480 let data = codec::encode(msg).map_err(|e| FederationError::Codec(e.to_string()))?;
481 self.sender
482 .send(data)
483 .await
484 .map_err(|e| FederationError::Transport(e.to_string()))
485 }
486}