Skip to main content

clasp_federation/
link.rs

1//! Federation link -- manages a connection to a peer router
2//!
3//! A FederationLink represents one side of a router-to-router connection.
4//! It uses the standard CLASP protocol to communicate, appearing as a
5//! normal client session on the peer router.
6
7use clasp_core::{
8    codec, FederationOp, FederationSyncMessage, HelloMessage, Message, QoS, SetMessage,
9    SubscribeMessage, Value, PROTOCOL_VERSION,
10};
11use clasp_transport::{TransportEvent, TransportReceiver, TransportSender};
12use std::collections::HashMap;
13use std::sync::Arc;
14use tokio::sync::mpsc;
15use tracing::{debug, error, info, warn};
16
17use crate::config::{FederationConfig, PeerInfo, PeerState};
18use crate::error::{FederationError, Result};
19
20/// Events emitted by a federation link to the local router
21#[derive(Debug)]
22pub enum LinkEvent {
23    /// Peer declared its namespace patterns
24    PeerNamespaces {
25        router_id: String,
26        patterns: Vec<String>,
27    },
28    /// Received a SET from the peer (should be applied to local state)
29    RemoteSet {
30        address: String,
31        value: Value,
32        revision: Option<u64>,
33        origin: String,
34    },
35    /// Received a PUBLISH from the peer (should be broadcast locally)
36    RemotePublish { message: Message, origin: String },
37    /// Peer sync complete
38    SyncComplete {
39        router_id: String,
40        pattern: String,
41        revision: u64,
42    },
43    /// Peer disconnected
44    Disconnected {
45        router_id: String,
46        reason: Option<String>,
47    },
48    /// Peer connected and handshake complete
49    Connected { router_id: String },
50}
51
52/// A federation link to a single peer router.
53///
54/// The link connects to a peer router as a normal CLASP client,
55/// exchanges federation metadata, syncs state, and then relays
56/// messages bidirectionally based on namespace ownership.
57pub struct FederationLink {
58    /// Local router configuration
59    config: FederationConfig,
60    /// Transport sender to the peer
61    sender: Arc<dyn TransportSender>,
62    /// Peer information (populated after handshake)
63    peer: Option<PeerInfo>,
64    /// Current connection state
65    state: PeerState,
66    /// Channel for sending events to the local router
67    event_tx: mpsc::Sender<LinkEvent>,
68    /// Revision vector: address -> last known revision from this peer
69    revision_vector: HashMap<String, u64>,
70}
71
72impl FederationLink {
73    /// Create a new federation link with an established transport connection.
74    ///
75    /// After creation, call `run()` to start the handshake and message relay loop.
76    pub fn new(
77        config: FederationConfig,
78        sender: Arc<dyn TransportSender>,
79        event_tx: mpsc::Sender<LinkEvent>,
80    ) -> Self {
81        Self {
82            config,
83            sender,
84            peer: None,
85            state: PeerState::Connecting,
86            event_tx,
87            revision_vector: HashMap::new(),
88        }
89    }
90
91    /// Run the federation link protocol.
92    ///
93    /// This performs the handshake, initial sync, and then relays messages
94    /// until the connection is closed. Runs as an async task.
95    pub async fn run(mut self, mut receiver: Box<dyn TransportReceiver>) -> Result<()> {
96        // Step 1: Send HELLO with federation feature
97        self.send_hello().await?;
98        self.state = PeerState::Handshaking;
99
100        // Step 2: Wait for WELCOME and process messages
101        loop {
102            match receiver.recv().await {
103                Some(TransportEvent::Data(data)) => {
104                    if let Err(e) = self.handle_data(&data).await {
105                        error!("Federation link error: {}", e);
106                        break;
107                    }
108                }
109                Some(TransportEvent::Disconnected { reason }) => {
110                    info!(
111                        "Federation peer disconnected: {:?}",
112                        reason.as_deref().unwrap_or("unknown")
113                    );
114                    let router_id = self
115                        .peer
116                        .as_ref()
117                        .map(|p| p.router_id.clone())
118                        .unwrap_or_default();
119                    let _ = self
120                        .event_tx
121                        .send(LinkEvent::Disconnected { router_id, reason })
122                        .await;
123                    break;
124                }
125                Some(TransportEvent::Error(e)) => {
126                    error!("Federation transport error: {}", e);
127                    break;
128                }
129                Some(TransportEvent::Connected) => {
130                    debug!("Federation transport connected event");
131                }
132                None => {
133                    debug!("Federation transport stream ended");
134                    break;
135                }
136            }
137        }
138
139        self.state = PeerState::Disconnected;
140        Ok(())
141    }
142
143    /// Send a HELLO message with federation feature advertised
144    async fn send_hello(&self) -> Result<()> {
145        let hello = Message::Hello(HelloMessage {
146            version: PROTOCOL_VERSION,
147            name: self.config.client_name.clone(),
148            features: self.config.features.clone(),
149            capabilities: None,
150            token: self.config.auth_token.clone(),
151        });
152
153        self.send_message(&hello, QoS::Confirm).await
154    }
155
156    /// Send federation namespace declaration to peer
157    async fn declare_namespaces(&self) -> Result<()> {
158        let msg = Message::FederationSync(FederationSyncMessage {
159            op: FederationOp::DeclareNamespaces,
160            patterns: self.config.owned_namespaces.clone(),
161            revisions: HashMap::new(),
162            since_revision: None,
163            origin: Some(self.config.router_id.clone()),
164        });
165
166        self.send_message(&msg, QoS::Confirm).await
167    }
168
169    /// Subscribe to the peer's namespaces so we receive their updates
170    async fn subscribe_to_peer(&self, patterns: &[String]) -> Result<()> {
171        for (i, pattern) in patterns.iter().enumerate() {
172            let sub = Message::Subscribe(SubscribeMessage {
173                id: (1000 + i) as u32, // Use high IDs to avoid collision with local subs
174                pattern: pattern.clone(),
175                types: vec![],
176                options: None,
177            });
178            self.send_message(&sub, QoS::Confirm).await?;
179        }
180        Ok(())
181    }
182
183    /// Request state sync from peer for a pattern
184    async fn request_sync(&self, pattern: &str, since: Option<u64>) -> Result<()> {
185        let msg = Message::FederationSync(FederationSyncMessage {
186            op: FederationOp::RequestSync,
187            patterns: vec![pattern.to_string()],
188            revisions: HashMap::new(),
189            since_revision: since,
190            origin: Some(self.config.router_id.clone()),
191        });
192
193        self.send_message(&msg, QoS::Confirm).await
194    }
195
196    /// Send our revision vector to the peer for sync negotiation
197    async fn send_revision_vector(&self) -> Result<()> {
198        let msg = Message::FederationSync(FederationSyncMessage {
199            op: FederationOp::RevisionVector,
200            patterns: vec![],
201            revisions: self.revision_vector.clone(),
202            since_revision: None,
203            origin: Some(self.config.router_id.clone()),
204        });
205
206        self.send_message(&msg, QoS::Confirm).await
207    }
208
209    /// Forward a local SET to the peer (if it matches the peer's namespaces)
210    pub async fn forward_set(&self, msg: &SetMessage, origin: &str) -> Result<()> {
211        // Don't forward messages that originated from this peer (loop prevention)
212        if let Some(ref peer) = self.peer {
213            if origin == peer.router_id {
214                return Ok(());
215            }
216        }
217
218        let set = Message::Set(SetMessage {
219            address: msg.address.clone(),
220            value: msg.value.clone(),
221            revision: msg.revision,
222            lock: false,
223            unlock: false,
224            ttl: None,
225        });
226
227        self.send_message(&set, QoS::Confirm).await
228    }
229
230    /// Forward a local PUBLISH to the peer
231    pub async fn forward_publish(&self, msg: &Message, origin: &str) -> Result<()> {
232        // Don't forward messages that originated from this peer
233        if let Some(ref peer) = self.peer {
234            if origin == peer.router_id {
235                return Ok(());
236            }
237        }
238
239        self.send_message(msg, QoS::Fire).await
240    }
241
242    /// Get the peer info (if handshake is complete)
243    pub fn peer(&self) -> Option<&PeerInfo> {
244        self.peer.as_ref()
245    }
246
247    /// Get the current connection state
248    pub fn state(&self) -> PeerState {
249        self.state
250    }
251
252    /// Check if the link is actively connected
253    pub fn is_active(&self) -> bool {
254        self.state == PeerState::Active
255    }
256
257    // =========================================================================
258    // Internal methods
259    // =========================================================================
260
261    /// Handle incoming data from the peer
262    async fn handle_data(&mut self, data: &[u8]) -> Result<()> {
263        let (msg, _frame) =
264            codec::decode(data).map_err(|e| FederationError::Codec(e.to_string()))?;
265
266        match msg {
267            Message::Welcome(welcome) => {
268                info!(
269                    "Federation handshake: received WELCOME from '{}' (session: {})",
270                    welcome.name, welcome.session
271                );
272
273                // Initialize peer info
274                self.peer = Some(PeerInfo {
275                    router_id: welcome.session.clone(),
276                    session_id: Some(welcome.session),
277                    namespaces: vec![],
278                    endpoint: None,
279                    outbound: true,
280                    state: PeerState::Handshaking,
281                });
282
283                // Declare our namespaces to the peer
284                self.declare_namespaces().await?;
285                self.state = PeerState::Syncing;
286            }
287
288            Message::FederationSync(fed_msg) => {
289                self.handle_federation_sync(fed_msg).await?;
290            }
291
292            Message::Set(set_msg) => {
293                // Peer sent us a SET -- apply it locally
294                let origin = self
295                    .peer
296                    .as_ref()
297                    .map(|p| p.router_id.clone())
298                    .unwrap_or_default();
299
300                // Track revision
301                if let Some(rev) = set_msg.revision {
302                    self.revision_vector.insert(set_msg.address.clone(), rev);
303                }
304
305                let _ = self
306                    .event_tx
307                    .send(LinkEvent::RemoteSet {
308                        address: set_msg.address,
309                        value: set_msg.value,
310                        revision: set_msg.revision,
311                        origin,
312                    })
313                    .await;
314            }
315
316            Message::Publish(_) => {
317                let origin = self
318                    .peer
319                    .as_ref()
320                    .map(|p| p.router_id.clone())
321                    .unwrap_or_default();
322
323                let _ = self
324                    .event_tx
325                    .send(LinkEvent::RemotePublish {
326                        message: msg,
327                        origin,
328                    })
329                    .await;
330            }
331
332            Message::Snapshot(snapshot) => {
333                // Initial snapshot from peer after subscribe
334                let origin = self
335                    .peer
336                    .as_ref()
337                    .map(|p| p.router_id.clone())
338                    .unwrap_or_default();
339
340                for param in snapshot.params {
341                    self.revision_vector
342                        .insert(param.address.clone(), param.revision);
343                    let _ = self
344                        .event_tx
345                        .send(LinkEvent::RemoteSet {
346                            address: param.address,
347                            value: param.value,
348                            revision: Some(param.revision),
349                            origin: origin.clone(),
350                        })
351                        .await;
352                }
353            }
354
355            Message::Ack(_) => {
356                // Acknowledged, no action needed
357            }
358
359            Message::Error(err) => {
360                warn!(
361                    "Federation peer error: {} (code: {})",
362                    err.message, err.code
363                );
364            }
365
366            Message::Ping => {
367                // Respond with pong
368                self.send_message(&Message::Pong, QoS::Fire).await?;
369            }
370
371            _ => {
372                debug!(
373                    "Federation link: ignoring message type {:?}",
374                    msg.type_code()
375                );
376            }
377        }
378
379        Ok(())
380    }
381
382    /// Handle a FederationSync message from the peer
383    async fn handle_federation_sync(&mut self, msg: FederationSyncMessage) -> Result<()> {
384        match msg.op {
385            FederationOp::DeclareNamespaces => {
386                let router_id = msg
387                    .origin
388                    .clone()
389                    .or_else(|| self.peer.as_ref().map(|p| p.router_id.clone()))
390                    .unwrap_or_default();
391
392                info!("Peer {} declares namespaces: {:?}", router_id, msg.patterns);
393
394                // Update peer info
395                if let Some(ref mut peer) = self.peer {
396                    peer.namespaces = msg.patterns.clone();
397                }
398
399                // Notify local router
400                let _ = self
401                    .event_tx
402                    .send(LinkEvent::PeerNamespaces {
403                        router_id: router_id.clone(),
404                        patterns: msg.patterns.clone(),
405                    })
406                    .await;
407
408                // Subscribe to the peer's namespaces
409                self.subscribe_to_peer(&msg.patterns).await?;
410
411                // Request initial sync
412                for pattern in &msg.patterns {
413                    self.request_sync(pattern, None).await?;
414                }
415            }
416
417            FederationOp::RequestSync => {
418                debug!("Peer requests sync for patterns: {:?}", msg.patterns);
419                // The local router should handle this by sending a snapshot
420                // For now, send our revision vector so the peer knows what we have
421                self.send_revision_vector().await?;
422            }
423
424            FederationOp::RevisionVector => {
425                debug!(
426                    "Received revision vector with {} entries",
427                    msg.revisions.len()
428                );
429                // Compare with our local state to identify what needs syncing
430                // For now, just store the peer's revision vector for reference
431            }
432
433            FederationOp::SyncComplete => {
434                let router_id = self
435                    .peer
436                    .as_ref()
437                    .map(|p| p.router_id.clone())
438                    .unwrap_or_default();
439
440                let pattern = msg.patterns.first().cloned().unwrap_or_default();
441                let revision = msg.since_revision.unwrap_or(0);
442
443                info!(
444                    "Sync complete for pattern '{}' at revision {}",
445                    pattern, revision
446                );
447
448                if let Some(ref mut peer) = self.peer {
449                    peer.state = PeerState::Active;
450                }
451                self.state = PeerState::Active;
452
453                let _ = self
454                    .event_tx
455                    .send(LinkEvent::SyncComplete {
456                        router_id,
457                        pattern,
458                        revision,
459                    })
460                    .await;
461
462                let _ = self
463                    .event_tx
464                    .send(LinkEvent::Connected {
465                        router_id: self
466                            .peer
467                            .as_ref()
468                            .map(|p| p.router_id.clone())
469                            .unwrap_or_default(),
470                    })
471                    .await;
472            }
473        }
474
475        Ok(())
476    }
477
478    /// Encode and send a message to the peer
479    async fn send_message(&self, msg: &Message, _qos: QoS) -> Result<()> {
480        let data = codec::encode(msg).map_err(|e| FederationError::Codec(e.to_string()))?;
481        self.sender
482            .send(data)
483            .await
484            .map_err(|e| FederationError::Transport(e.to_string()))
485    }
486}