clasp_federation/
manager.rs1use std::collections::HashMap;
8use std::sync::Arc;
9use tokio::sync::{mpsc, RwLock};
10use tracing::{info, warn};
11
12use crate::config::{FederationConfig, FederationMode, PeerInfo, PeerState};
13use crate::link::{FederationLink, LinkEvent};
14use crate::namespace::NamespaceManager;
15
16pub struct FederationManager {
22 config: FederationConfig,
24 namespaces: Arc<RwLock<NamespaceManager>>,
26 peers: Arc<RwLock<HashMap<String, PeerInfo>>>,
28 event_rx: Option<mpsc::Receiver<LinkEvent>>,
30 event_tx: mpsc::Sender<LinkEvent>,
32}
33
34impl FederationManager {
35 pub fn new(config: FederationConfig) -> Self {
37 let (event_tx, event_rx) = mpsc::channel(1024);
38 let namespaces = NamespaceManager::new(config.owned_namespaces.clone());
39
40 Self {
41 config,
42 namespaces: Arc::new(RwLock::new(namespaces)),
43 peers: Arc::new(RwLock::new(HashMap::new())),
44 event_rx: Some(event_rx),
45 event_tx,
46 }
47 }
48
49 pub fn event_sender(&self) -> mpsc::Sender<LinkEvent> {
51 self.event_tx.clone()
52 }
53
54 pub fn take_event_receiver(&mut self) -> Option<mpsc::Receiver<LinkEvent>> {
59 self.event_rx.take()
60 }
61
62 pub fn create_link(&self, sender: Arc<dyn clasp_transport::TransportSender>) -> FederationLink {
67 FederationLink::new(self.config.clone(), sender, self.event_tx.clone())
68 }
69
70 pub async fn process_event(&self, event: &LinkEvent) {
75 match event {
76 LinkEvent::PeerNamespaces {
77 router_id,
78 patterns,
79 } => {
80 info!("Registering peer {} namespaces: {:?}", router_id, patterns);
81 self.namespaces
82 .write()
83 .await
84 .register_peer(router_id, patterns.clone());
85
86 self.peers
87 .write()
88 .await
89 .entry(router_id.clone())
90 .and_modify(|p| {
91 p.namespaces = patterns.clone();
92 p.state = PeerState::Syncing;
93 })
94 .or_insert_with(|| PeerInfo {
95 router_id: router_id.clone(),
96 session_id: None,
97 namespaces: patterns.clone(),
98 endpoint: None,
99 outbound: true,
100 state: PeerState::Syncing,
101 });
102
103 let conflicts = self.namespaces.read().await.find_conflicts();
105 for (pattern, a, b) in &conflicts {
106 warn!(
107 "Namespace conflict detected: {} between {} and {}",
108 pattern, a, b
109 );
110 }
111 }
112
113 LinkEvent::Connected { router_id } => {
114 info!("Federation peer connected: {}", router_id);
115 if let Some(peer) = self.peers.write().await.get_mut(router_id) {
116 peer.state = PeerState::Active;
117 }
118 }
119
120 LinkEvent::Disconnected { router_id, reason } => {
121 info!(
122 "Federation peer disconnected: {} (reason: {:?})",
123 router_id, reason
124 );
125 self.namespaces.write().await.remove_peer(router_id);
126 if let Some(peer) = self.peers.write().await.get_mut(router_id) {
127 peer.state = PeerState::Disconnected;
128 }
129 }
130
131 LinkEvent::SyncComplete {
132 router_id,
133 pattern,
134 revision,
135 } => {
136 info!(
137 "Sync complete with peer {} for {} at rev {}",
138 router_id, pattern, revision
139 );
140 }
141
142 _ => {}
144 }
145 }
146
147 pub async fn should_forward(&self, address: &str, origin: Option<&str>) -> bool {
149 let ns = self.namespaces.read().await;
150 !ns.peers_for_address(address, origin).is_empty()
151 }
152
153 pub async fn peers_for_address(
155 &self,
156 address: &str,
157 exclude_origin: Option<&str>,
158 ) -> Vec<String> {
159 self.namespaces
160 .read()
161 .await
162 .peers_for_address(address, exclude_origin)
163 }
164
165 pub async fn peer_info(&self, router_id: &str) -> Option<PeerInfo> {
167 self.peers.read().await.get(router_id).cloned()
168 }
169
170 pub async fn active_peers(&self) -> Vec<PeerInfo> {
172 self.peers
173 .read()
174 .await
175 .values()
176 .filter(|p| p.state == PeerState::Active)
177 .cloned()
178 .collect()
179 }
180
181 pub async fn peer_count(&self) -> usize {
183 self.peers.read().await.len()
184 }
185
186 pub fn mode(&self) -> &FederationMode {
188 &self.config.mode
189 }
190
191 pub fn router_id(&self) -> &str {
193 &self.config.router_id
194 }
195
196 pub fn owned_namespaces(&self) -> &[String] {
198 &self.config.owned_namespaces
199 }
200}
201
202#[cfg(test)]
203mod tests {
204 use super::*;
205
206 fn test_config() -> FederationConfig {
207 FederationConfig {
208 router_id: "test-router".to_string(),
209 owned_namespaces: vec!["/local/**".to_string()],
210 ..Default::default()
211 }
212 }
213
214 #[tokio::test]
215 async fn test_manager_creation() {
216 let manager = FederationManager::new(test_config());
217 assert_eq!(manager.router_id(), "test-router");
218 assert_eq!(manager.peer_count().await, 0);
219 }
220
221 #[tokio::test]
222 async fn test_process_peer_namespaces() {
223 let manager = FederationManager::new(test_config());
224
225 let event = LinkEvent::PeerNamespaces {
226 router_id: "peer-a".to_string(),
227 patterns: vec!["/remote/**".to_string()],
228 };
229
230 manager.process_event(&event).await;
231
232 assert_eq!(manager.peer_count().await, 1);
233 assert!(manager.should_forward("/remote/foo", None).await);
234 assert!(!manager.should_forward("/local/foo", None).await);
235 }
236
237 #[tokio::test]
238 async fn test_process_disconnect() {
239 let manager = FederationManager::new(test_config());
240
241 let event = LinkEvent::PeerNamespaces {
243 router_id: "peer-a".to_string(),
244 patterns: vec!["/remote/**".to_string()],
245 };
246 manager.process_event(&event).await;
247 assert!(manager.should_forward("/remote/foo", None).await);
248
249 let event = LinkEvent::Disconnected {
251 router_id: "peer-a".to_string(),
252 reason: None,
253 };
254 manager.process_event(&event).await;
255 assert!(!manager.should_forward("/remote/foo", None).await);
256 }
257
258 #[tokio::test]
259 async fn test_origin_exclusion() {
260 let manager = FederationManager::new(test_config());
261
262 let event = LinkEvent::PeerNamespaces {
263 router_id: "peer-a".to_string(),
264 patterns: vec!["/shared/**".to_string()],
265 };
266 manager.process_event(&event).await;
267
268 let peers = manager.peers_for_address("/shared/foo", None).await;
270 assert_eq!(peers.len(), 1);
271
272 let peers = manager
274 .peers_for_address("/shared/foo", Some("peer-a"))
275 .await;
276 assert!(peers.is_empty());
277 }
278}