Skip to main content

clasp_federation/
manager.rs

1//! Federation manager -- orchestrates federation links
2//!
3//! The FederationManager owns all outbound federation links and
4//! coordinates namespace management, message forwarding, and
5//! reconnection logic.
6
7use std::collections::HashMap;
8use std::sync::Arc;
9use tokio::sync::{mpsc, RwLock};
10use tracing::{info, warn};
11
12use crate::config::{FederationConfig, FederationMode, PeerInfo, PeerState};
13use crate::link::{FederationLink, LinkEvent};
14use crate::namespace::NamespaceManager;
15
16/// Federation manager that coordinates all peer connections.
17///
18/// The manager maintains a set of federation links and a namespace
19/// registry, routing messages to the appropriate peers based on
20/// address patterns.
21pub struct FederationManager {
22    /// Federation configuration
23    config: FederationConfig,
24    /// Namespace manager (shared with links)
25    namespaces: Arc<RwLock<NamespaceManager>>,
26    /// Active peer connections (router_id -> peer info)
27    peers: Arc<RwLock<HashMap<String, PeerInfo>>>,
28    /// Channel for receiving events from links
29    event_rx: Option<mpsc::Receiver<LinkEvent>>,
30    /// Event sender (cloned to each link)
31    event_tx: mpsc::Sender<LinkEvent>,
32}
33
34impl FederationManager {
35    /// Create a new federation manager
36    pub fn new(config: FederationConfig) -> Self {
37        let (event_tx, event_rx) = mpsc::channel(1024);
38        let namespaces = NamespaceManager::new(config.owned_namespaces.clone());
39
40        Self {
41            config,
42            namespaces: Arc::new(RwLock::new(namespaces)),
43            peers: Arc::new(RwLock::new(HashMap::new())),
44            event_rx: Some(event_rx),
45            event_tx,
46        }
47    }
48
49    /// Get the event sender for creating new links
50    pub fn event_sender(&self) -> mpsc::Sender<LinkEvent> {
51        self.event_tx.clone()
52    }
53
54    /// Take the event receiver (can only be called once).
55    ///
56    /// The caller should process events in a loop to handle
57    /// messages from federation peers.
58    pub fn take_event_receiver(&mut self) -> Option<mpsc::Receiver<LinkEvent>> {
59        self.event_rx.take()
60    }
61
62    /// Create a federation link for an established transport connection.
63    ///
64    /// The link will perform the CLASP handshake, exchange federation metadata,
65    /// and begin relaying messages. Call `link.run(receiver)` to start it.
66    pub fn create_link(&self, sender: Arc<dyn clasp_transport::TransportSender>) -> FederationLink {
67        FederationLink::new(self.config.clone(), sender, self.event_tx.clone())
68    }
69
70    /// Process a link event, updating internal state.
71    ///
72    /// Call this for each event received from `take_event_receiver()`.
73    /// Returns the event for further processing by the router.
74    pub async fn process_event(&self, event: &LinkEvent) {
75        match event {
76            LinkEvent::PeerNamespaces {
77                router_id,
78                patterns,
79            } => {
80                info!("Registering peer {} namespaces: {:?}", router_id, patterns);
81                self.namespaces
82                    .write()
83                    .await
84                    .register_peer(router_id, patterns.clone());
85
86                self.peers
87                    .write()
88                    .await
89                    .entry(router_id.clone())
90                    .and_modify(|p| {
91                        p.namespaces = patterns.clone();
92                        p.state = PeerState::Syncing;
93                    })
94                    .or_insert_with(|| PeerInfo {
95                        router_id: router_id.clone(),
96                        session_id: None,
97                        namespaces: patterns.clone(),
98                        endpoint: None,
99                        outbound: true,
100                        state: PeerState::Syncing,
101                    });
102
103                // Check for conflicts
104                let conflicts = self.namespaces.read().await.find_conflicts();
105                for (pattern, a, b) in &conflicts {
106                    warn!(
107                        "Namespace conflict detected: {} between {} and {}",
108                        pattern, a, b
109                    );
110                }
111            }
112
113            LinkEvent::Connected { router_id } => {
114                info!("Federation peer connected: {}", router_id);
115                if let Some(peer) = self.peers.write().await.get_mut(router_id) {
116                    peer.state = PeerState::Active;
117                }
118            }
119
120            LinkEvent::Disconnected { router_id, reason } => {
121                info!(
122                    "Federation peer disconnected: {} (reason: {:?})",
123                    router_id, reason
124                );
125                self.namespaces.write().await.remove_peer(router_id);
126                if let Some(peer) = self.peers.write().await.get_mut(router_id) {
127                    peer.state = PeerState::Disconnected;
128                }
129            }
130
131            LinkEvent::SyncComplete {
132                router_id,
133                pattern,
134                revision,
135            } => {
136                info!(
137                    "Sync complete with peer {} for {} at rev {}",
138                    router_id, pattern, revision
139                );
140            }
141
142            // RemoteSet and RemotePublish are handled by the router, not here
143            _ => {}
144        }
145    }
146
147    /// Check if an address should be forwarded to federation peers
148    pub async fn should_forward(&self, address: &str, origin: Option<&str>) -> bool {
149        let ns = self.namespaces.read().await;
150        !ns.peers_for_address(address, origin).is_empty()
151    }
152
153    /// Get peers that should receive a message for the given address
154    pub async fn peers_for_address(
155        &self,
156        address: &str,
157        exclude_origin: Option<&str>,
158    ) -> Vec<String> {
159        self.namespaces
160            .read()
161            .await
162            .peers_for_address(address, exclude_origin)
163    }
164
165    /// Get information about a peer
166    pub async fn peer_info(&self, router_id: &str) -> Option<PeerInfo> {
167        self.peers.read().await.get(router_id).cloned()
168    }
169
170    /// Get all active peers
171    pub async fn active_peers(&self) -> Vec<PeerInfo> {
172        self.peers
173            .read()
174            .await
175            .values()
176            .filter(|p| p.state == PeerState::Active)
177            .cloned()
178            .collect()
179    }
180
181    /// Get peer count
182    pub async fn peer_count(&self) -> usize {
183        self.peers.read().await.len()
184    }
185
186    /// Get the federation mode
187    pub fn mode(&self) -> &FederationMode {
188        &self.config.mode
189    }
190
191    /// Get the local router ID
192    pub fn router_id(&self) -> &str {
193        &self.config.router_id
194    }
195
196    /// Get local namespace patterns
197    pub fn owned_namespaces(&self) -> &[String] {
198        &self.config.owned_namespaces
199    }
200}
201
202#[cfg(test)]
203mod tests {
204    use super::*;
205
206    fn test_config() -> FederationConfig {
207        FederationConfig {
208            router_id: "test-router".to_string(),
209            owned_namespaces: vec!["/local/**".to_string()],
210            ..Default::default()
211        }
212    }
213
214    #[tokio::test]
215    async fn test_manager_creation() {
216        let manager = FederationManager::new(test_config());
217        assert_eq!(manager.router_id(), "test-router");
218        assert_eq!(manager.peer_count().await, 0);
219    }
220
221    #[tokio::test]
222    async fn test_process_peer_namespaces() {
223        let manager = FederationManager::new(test_config());
224
225        let event = LinkEvent::PeerNamespaces {
226            router_id: "peer-a".to_string(),
227            patterns: vec!["/remote/**".to_string()],
228        };
229
230        manager.process_event(&event).await;
231
232        assert_eq!(manager.peer_count().await, 1);
233        assert!(manager.should_forward("/remote/foo", None).await);
234        assert!(!manager.should_forward("/local/foo", None).await);
235    }
236
237    #[tokio::test]
238    async fn test_process_disconnect() {
239        let manager = FederationManager::new(test_config());
240
241        // Connect
242        let event = LinkEvent::PeerNamespaces {
243            router_id: "peer-a".to_string(),
244            patterns: vec!["/remote/**".to_string()],
245        };
246        manager.process_event(&event).await;
247        assert!(manager.should_forward("/remote/foo", None).await);
248
249        // Disconnect
250        let event = LinkEvent::Disconnected {
251            router_id: "peer-a".to_string(),
252            reason: None,
253        };
254        manager.process_event(&event).await;
255        assert!(!manager.should_forward("/remote/foo", None).await);
256    }
257
258    #[tokio::test]
259    async fn test_origin_exclusion() {
260        let manager = FederationManager::new(test_config());
261
262        let event = LinkEvent::PeerNamespaces {
263            router_id: "peer-a".to_string(),
264            patterns: vec!["/shared/**".to_string()],
265        };
266        manager.process_event(&event).await;
267
268        // Should forward to peer-a
269        let peers = manager.peers_for_address("/shared/foo", None).await;
270        assert_eq!(peers.len(), 1);
271
272        // Should not forward back to origin
273        let peers = manager
274            .peers_for_address("/shared/foo", Some("peer-a"))
275            .await;
276        assert!(peers.is_empty());
277    }
278}