Skip to main content

slim_controller/
service.rs

1// Copyright AGNTCY Contributors (https://github.com/agntcy)
2// SPDX-License-Identifier: Apache-2.0
3
4use std::collections::HashMap;
5use std::pin::Pin;
6use std::sync::Arc;
7
8use std::time::Duration;
9use std::vec;
10
11use display_error_chain::ErrorChainExt;
12use slim_config::component::id::ID;
13use slim_config::grpc::server::ServerConfig;
14use slim_session::SessionMessage;
15use slim_session::subscription_manager::SubscriptionManager;
16use tokio::sync::mpsc;
17use tokio::task::JoinHandle;
18use tokio_stream::{Stream, StreamExt, wrappers::ReceiverStream};
19use tokio_util::sync::CancellationToken;
20use tonic::{Request, Response, Status};
21use tracing::{debug, error, info};
22
23use crate::api::proto::api::v1::control_message::Payload;
24use crate::api::proto::api::v1::controller_service_server::ControllerServiceServer;
25use crate::api::proto::api::v1::{
26    self, ConnectionDetails, ConnectionDirection, ConnectionListResponse, ConnectionType,
27    SubscriptionListResponse,
28};
29use crate::api::proto::api::v1::{
30    Ack, ConnectionEntry, ControlMessage, SubscriptionEntry,
31    controller_service_client::ControllerServiceClient,
32    controller_service_server::ControllerService as GrpcControllerService,
33};
34use crate::errors::ControllerError;
35use prost_types::Struct;
36use slim_auth::auth_provider::{AuthProvider, AuthVerifier};
37use slim_auth::traits::TokenProvider;
38use slim_config::grpc::client::ClientConfig;
39use slim_datapath::api::{
40    CommandPayload, Content, MessageType::Link as LinkType, MessageType::Publish,
41    MessageType::Subscribe, MessageType::SubscriptionAck as SubscriptionAckType,
42    MessageType::Unsubscribe, ProtoMessage as DataPlaneMessage,
43};
44use slim_datapath::api::{ProtoSessionMessageType, ProtoSessionType};
45use slim_datapath::message_processing::MessageProcessor;
46use slim_datapath::messages::Name;
47use slim_datapath::messages::encoder::calculate_hash;
48use slim_datapath::messages::utils::{DELETE_GROUP, IS_MODERATOR, SlimHeaderFlags, TRUE_VAL};
49use slim_datapath::tables::SubscriptionTable;
50
51use slim_session::timer::{Timer, TimerType};
52use slim_session::timer_factory::{TimerFactory, TimerSettings};
53
54type TxChannel = mpsc::Sender<Result<ControlMessage, Status>>;
55type TxChannels = HashMap<String, TxChannel>;
56
57// Controller component
58const CONTROLLER_COMPONENT: &str = "controller";
59/// Maximum number of queued subscription notifications
60const MAX_QUEUED_NOTIFICATIONS: usize = 1000; // Prevent unbounded growth
61
62/// Settings struct for creating a ControlPlane instance
63#[derive(Clone)]
64pub struct ControlPlaneSettings {
65    /// ID of this SLIM instance
66    pub id: ID,
67    /// Optional group name
68    pub group_name: Option<String>,
69    /// Server configurations
70    pub servers: Vec<ServerConfig>,
71    /// Client configurations
72    pub clients: Vec<ClientConfig>,
73    /// Message processor instance
74    pub message_processor: Arc<MessageProcessor>,
75    /// Optional authentication provider
76    pub auth_provider: Option<AuthProvider>,
77    /// Optional authentication verifier
78    pub auth_verifier: Option<AuthVerifier>,
79    /// array of connection details used by the control
80    /// plane to store the connection settings (e.g., TLS settings).
81    pub connection_details: Vec<ConnectionDetails>,
82}
83
84/// Inner structure for the controller service
85/// This structure holds the internal state of the controller service,
86/// including the ID, message processor, connections, and channels.
87/// It is normally wrapped in an Arc to allow shared ownership across multiple threads.
88struct ControllerServiceInternal {
89    /// ID of this SLIM instance
90    id: ID,
91
92    /// controller name
93    controller_name: slim_datapath::messages::Name,
94
95    /// optional group name
96    group_name: Option<String>,
97
98    /// underlying message processor
99    message_processor: Arc<MessageProcessor>,
100
101    /// map of connection IDs to their configuration
102    connections: Arc<parking_lot::RwLock<HashMap<String, u64>>>,
103
104    /// channel to send messages into the datapath
105    tx_slim: mpsc::Sender<Result<DataPlaneMessage, Status>>,
106
107    /// channels to send control messages
108    tx_channels: parking_lot::RwLock<TxChannels>,
109
110    /// cancellation token for graceful shutdown
111    cancellation_tokens: parking_lot::RwLock<HashMap<String, CancellationToken>>,
112
113    /// drain watch channel
114    drain_watch: parking_lot::RwLock<Option<drain::Watch>>,
115
116    /// authentication provider for adding authentication to outgoing messages to clients
117    auth_provider: Option<AuthProvider>,
118
119    /// authentication verifier for verifying incoming messages from clients
120    _auth_verifier: Option<AuthVerifier>,
121
122    /// queue for pending subscription notifications when connections are down
123    pending_notifications: Arc<parking_lot::Mutex<Vec<ControlMessage>>>,
124
125    /// Manages pending subscription ack tracking (id generation, registration, resolution).
126    subscription_manager: SubscriptionManager,
127
128    /// map of generated u32 keys to original string message IDs and their associated timers
129    message_id_map: Arc<parking_lot::RwLock<HashMap<u32, (String, Option<Timer>)>>>,
130
131    /// timer factory for controller messages
132    /// used to create timers for messages that require timeouts
133    /// the lock is needed to set the timer factory after initialization
134    /// because it requires a channel to send session messages
135    timer_factory: parking_lot::RwLock<Option<TimerFactory>>,
136
137    /// connection details used by control plane to store connection settings
138    connection_details: Vec<ConnectionDetails>,
139
140    /// Maps (subscription_name, connection_id) → subscription_id for route tracking
141    route_subscription_ids: parking_lot::Mutex<HashMap<(Name, u64), u64>>,
142}
143
144#[derive(Clone)]
145struct ControllerService {
146    /// internal service state
147    inner: Arc<ControllerServiceInternal>,
148}
149
150/// The ControlPlane service is the main entry point for the controller service.
151pub struct ControlPlane {
152    /// servers
153    servers: Vec<ServerConfig>,
154
155    /// clients
156    clients: Vec<ClientConfig>,
157
158    /// drain signal channel
159    drain_signal: parking_lot::RwLock<Option<drain::Signal>>,
160
161    /// controller
162    controller: ControllerService,
163
164    /// channel to receive message from the datapath
165    /// to be used in listen_from_data_plane
166    rx_slim_option: Option<mpsc::Receiver<Result<DataPlaneMessage, Status>>>,
167}
168
169/// ControllerServiceInternal implements Drop trait to cancel all running listeners and
170/// clean up resources.
171impl Drop for ControlPlane {
172    fn drop(&mut self) {
173        // cancel all running listeners
174        for (_endpoint, token) in self.controller.inner.cancellation_tokens.write().drain() {
175            token.cancel();
176        }
177    }
178}
179
180pub(crate) fn from_server_config(server_config: &ServerConfig) -> ConnectionDetails {
181    // Convert metadata from MetadataMap to proto Struct
182    let metadata = server_config.metadata.as_ref().map(|m| {
183        let fields = m
184            .inner
185            .iter()
186            .map(|(k, v)| (k.clone(), prost_types::Value::from(v)))
187            .collect();
188        Struct { fields }
189    });
190
191    // Serialize auth config to JSON string
192    let auth = serde_json::to_string(&server_config.auth).ok();
193
194    // Serialize tls config to JSON string
195    let tls = serde_json::to_string(&server_config.tls_setting.config).ok();
196
197    ConnectionDetails {
198        endpoint: server_config.endpoint.clone(),
199        mtls_required: !server_config.tls_setting.insecure,
200        metadata,
201        auth,
202        tls,
203    }
204}
205
206/// ControlPlane implements the service trait for the controller service.
207impl ControlPlane {
208    /// Create a new ControlPlane service instance
209    /// This function initializes the ControlPlane with the given ID, servers, clients, and message processor.
210    /// It also sets up the internal state, including the connections and channels.
211    /// # Arguments
212    /// * `id` - The ID of the SLIM instance.
213    /// * `servers` - A vector of server configurations.
214    /// * `clients` - A vector of client configurations.
215    /// * `drain_rx` - A drain watch channel for graceful shutdown.
216    /// * `message_processor` - An Arc to the message processor instance.
217    /// * `pubsub_servers` - A slice of server configurations for pub/sub connections.
218    /// # Returns
219    /// A new instance of ControlPlane.
220    pub fn new(config: ControlPlaneSettings) -> Self {
221        // create local connection with the message processor
222        let (_, tx_slim, rx_slim) = config
223            .message_processor
224            .register_local_connection(true)
225            .unwrap();
226
227        let (signal, watch) = drain::channel();
228        let controller_name = Name::from_strings([
229            CONTROLLER_COMPONENT,
230            CONTROLLER_COMPONENT,
231            CONTROLLER_COMPONENT,
232        ])
233        .with_id(rand::random::<u64>());
234        debug!("create controller with name: {}", controller_name);
235
236        ControlPlane {
237            servers: config.servers,
238            clients: config.clients,
239            controller: ControllerService {
240                inner: Arc::new(ControllerServiceInternal {
241                    id: config.id,
242                    controller_name,
243                    group_name: config.group_name,
244                    message_processor: config.message_processor,
245                    connections: Arc::new(parking_lot::RwLock::new(HashMap::new())),
246                    subscription_manager: SubscriptionManager::new(tx_slim.clone()),
247                    tx_slim,
248                    tx_channels: parking_lot::RwLock::new(HashMap::new()),
249                    cancellation_tokens: parking_lot::RwLock::new(HashMap::new()),
250                    drain_watch: parking_lot::RwLock::new(Some(watch)),
251                    auth_provider: config.auth_provider,
252                    _auth_verifier: config.auth_verifier,
253                    pending_notifications: Arc::new(parking_lot::Mutex::new(Vec::new())),
254                    message_id_map: Arc::new(parking_lot::RwLock::new(HashMap::new())),
255                    timer_factory: parking_lot::RwLock::new(None),
256                    connection_details: config.connection_details,
257                    route_subscription_ids: parking_lot::Mutex::new(HashMap::new()),
258                }),
259            },
260            drain_signal: parking_lot::RwLock::new(Some(signal)),
261            rx_slim_option: Some(rx_slim),
262        }
263    }
264
265    /// Take an existing ControlPlane instance and return a new one with the provided clients.
266    pub fn with_clients(mut self, clients: Vec<ClientConfig>) -> Self {
267        self.clients = clients;
268        self
269    }
270
271    /// Take an existing ControlPlane instance and return a new one with the provided servers.
272    pub fn with_servers(mut self, servers: Vec<ServerConfig>) -> Self {
273        self.servers = servers;
274        self
275    }
276
277    /// Run the clients and servers of the ControlPlane service.
278    /// This function starts all the servers and clients defined in the ControlPlane.
279    /// # Returns
280    /// A Result indicating success or failure of the operation.
281    /// # Errors
282    /// If there is an error starting any of the servers or clients, it will return a ControllerError.
283    pub async fn run(&mut self) -> Result<(), ControllerError> {
284        let rx = self
285            .rx_slim_option
286            .take()
287            .ok_or(ControllerError::AlreadyStarted)?;
288
289        // Collect servers to avoid borrowing self both mutably and immutably
290        let servers = self.servers.clone();
291        let clients = self.clients.clone();
292
293        // run all servers
294        for server in servers {
295            self.run_server(server).await?;
296        }
297
298        // run all clients
299        for client in clients {
300            self.run_client(client).await?;
301        }
302
303        self.listen_from_data_plane(rx).await?;
304
305        Ok(())
306    }
307
308    pub async fn shutdown(&self) -> Result<(), ControllerError> {
309        // Get signal drain
310        let signal = self
311            .drain_signal
312            .write()
313            .take()
314            .ok_or(ControllerError::AlreadyStopped)?;
315
316        // Stop everything using the cancellation tokens
317        self.controller
318            .inner
319            .cancellation_tokens
320            .write()
321            .drain()
322            .for_each(|(endpoint, token)| {
323                info!(%endpoint, "stopping");
324                token.cancel();
325            });
326
327        // Drop watch channel
328        self.controller.inner.drain_watch.write().take();
329
330        // Wait for drain to complete
331        signal.drain().await;
332
333        Ok(())
334    }
335
336    async fn listen_from_data_plane(
337        &mut self,
338        mut rx: mpsc::Receiver<Result<DataPlaneMessage, Status>>,
339    ) -> Result<(), ControllerError> {
340        let cancellation_token = CancellationToken::new();
341        let cancellation_token_clone = cancellation_token.clone();
342
343        self.controller
344            .inner
345            .cancellation_tokens
346            .write()
347            .insert("DATA_PLANE".to_string(), cancellation_token_clone);
348
349        let clients = self.clients.clone();
350        let controller = self.controller.clone();
351
352        // Send subscription to data-plane to receive messages for the controller source name
353        let controller_name = self.controller.inner.controller_name.clone();
354        let subscribe_msg = DataPlaneMessage::builder()
355            .source(controller_name.clone())
356            .destination(controller_name.clone())
357            .identity(controller_name.to_string())
358            .build_subscribe()
359            .unwrap();
360
361        controller
362            .inner
363            .tx_slim
364            .send(Ok(subscribe_msg))
365            .await
366            .map_err(|e| {
367                error!(error = %e.chain(), "failed to send subscribe message to data plane");
368                ControllerError::DatapathSendError(e.to_string())
369            })?;
370
371        // Get a drain watch clone
372        let watch = self.controller.drain_watch()?;
373
374        debug!("Starting data plane listener: {}", controller_name);
375        tokio::spawn(async move {
376            let mut drain_fut = std::pin::pin!(watch.signaled());
377            loop {
378                tokio::select! {
379                    next = rx.recv() => {
380                        match next {
381                            Some(res) => {
382                                match res {
383                                    Ok(msg) => {
384                                        debug!("Send sub/unsub/ack to control plane for message: {:?}", msg);
385                                        match msg.get_type() {
386                                            Subscribe(_) => {
387                                                controller.handle_subscribe_message(msg.get_dst(), &clients).await;
388                                            }
389                                            Unsubscribe(_) => {
390                                                controller.handle_unsubscribe_message(msg.get_dst(), &clients).await;
391                                            }
392                                            Publish(_) => {
393                                                if msg.get_session_message_type() == ProtoSessionMessageType::GroupAck {
394                                                    controller.send_ack_message(msg.get_id(), true, &clients).await;
395                                                } else {
396                                                    debug!("Ignoring publish message with session type: {:?}", msg.get_session_message_type());
397                                                }
398                                            }
399                                            LinkType(_) => {
400                                                debug!("received link message from dataplane - this should not happen");
401                                            }
402                                            SubscriptionAckType(_) => {
403                                                controller.inner.subscription_manager.resolve_ack(msg.get_subscription_ack());
404                                            }
405                                        }
406                                    }
407                                    Err(e) => {
408                                        error!(error = %e.chain(), "received error from the data plane");
409                                        continue;
410                                    }
411                                }
412                            }
413                            None => {
414                                debug!("Data plane receiver channel closed.");
415                                break;
416                            }
417                        }
418                    }
419                    _ = cancellation_token.cancelled() => {
420                        debug!("shutting down stream on cancellation token");
421                        break;
422                    }
423                    _ = &mut drain_fut => {
424                        debug!("shutting down stream on drain");
425                        break;
426                    }
427                }
428            }
429        });
430        Ok(())
431    }
432
433    /// Stop the ControlPlane service.
434    /// This function stops all running listeners and cancels any ongoing operations.
435    /// It cleans up the internal state and ensures that all resources are released properly.
436    pub fn stop(&mut self) {
437        info!("stopping controller service");
438
439        // cancel all running listeners
440        for (endpoint, token) in self.controller.inner.cancellation_tokens.write().drain() {
441            info!(%endpoint, "stopping");
442            token.cancel();
443        }
444    }
445
446    /// Run a client configuration.
447    /// This function connects to the control plane using the provided client configuration.
448    /// It checks if the client is already running and if not, it starts a new connection.
449    async fn run_client(&mut self, client: ClientConfig) -> Result<(), ControllerError> {
450        if self
451            .controller
452            .inner
453            .cancellation_tokens
454            .read()
455            .contains_key(&client.endpoint)
456        {
457            return Err(ControllerError::ClientAlreadyRunning(client.endpoint));
458        }
459
460        let cancellation_token = CancellationToken::new();
461
462        let tx = self
463            .controller
464            .connect(client.clone(), cancellation_token.clone())
465            .await?;
466
467        // Store the cancellation token in the controller service
468        self.controller
469            .inner
470            .cancellation_tokens
471            .write()
472            .insert(client.endpoint.clone(), cancellation_token);
473
474        // Store the sender in the tx_channels map
475        self.controller
476            .inner
477            .tx_channels
478            .write()
479            .insert(client.endpoint.clone(), tx);
480
481        // return the sender for control messages
482        Ok(())
483    }
484
485    /// Run a server configuration.
486    /// This function starts a server using the provided server configuration.
487    /// It checks if the server is already running and if not, it starts a new server.
488    pub async fn run_server(&mut self, config: ServerConfig) -> Result<(), ControllerError> {
489        // Check if the server is already running
490        if self
491            .controller
492            .inner
493            .cancellation_tokens
494            .read()
495            .contains_key(&config.endpoint)
496        {
497            error!(endpoint = config.endpoint, "server is already running",);
498            return Err(ControllerError::ServerAlreadyRunning(config.endpoint));
499        }
500
501        let token = config
502            .run_server(
503                &[ControllerServiceServer::new(self.controller.clone())],
504                self.controller.drain_watch()?,
505            )
506            .await?;
507
508        // Store the cancellation token in the controller service
509        self.controller
510            .inner
511            .cancellation_tokens
512            .write()
513            .insert(config.endpoint.clone(), token.clone());
514
515        info!(%config.endpoint, "started controlplane server");
516
517        Ok(())
518    }
519}
520
521fn generate_session_id(moderator: &Name, channel: &Name) -> u32 {
522    // get all the components of the two names
523    // and hash them together to get the session id
524    let mut all: [u64; 8] = [0; 8];
525    let m = moderator.components();
526    let c = channel.components();
527    all[..4].copy_from_slice(m);
528    all[4..].copy_from_slice(c);
529
530    let hash = calculate_hash(&all);
531    (hash ^ (hash >> 32)) as u32
532}
533
534fn get_name_from_string(string_name: &str) -> Result<Name, ControllerError> {
535    let parts: Vec<&str> = string_name.split('/').collect();
536    if parts.len() < 3 {
537        return Err(ControllerError::MalformedName(string_name.to_owned()));
538    }
539
540    if parts.len() == 4 {
541        let id = parts[3]
542            .parse::<u64>()
543            .map_err(|_e| ControllerError::MalformedName(string_name.to_owned()))?;
544        return Ok(Name::from_strings([parts[0], parts[1], parts[2]]).with_id(id));
545    }
546
547    Ok(Name::from_strings([parts[0], parts[1], parts[2]]))
548}
549
550#[allow(clippy::too_many_arguments)]
551fn create_channel_message(
552    source: &Name,
553    destination: &Name,
554    request_type: ProtoSessionMessageType,
555    session_id: u32,
556    message_id: u32,
557    payload: Option<Content>,
558    auth_provider: &Option<AuthProvider>,
559) -> Result<DataPlaneMessage, ControllerError> {
560    // if the auth_provider is set try to get an identity
561    let identity_token = match auth_provider {
562        Some(auth) => auth.get_token()?,
563        None => String::new(),
564    };
565
566    let message = DataPlaneMessage::builder()
567        .source(source.clone())
568        .destination(destination.clone())
569        .identity(&identity_token)
570        .session_type(ProtoSessionType::Multicast)
571        .session_message_type(request_type)
572        .session_id(session_id)
573        .message_id(message_id)
574        .payload(payload.ok_or(ControllerError::PayloadMissing)?)
575        .build_publish()?;
576
577    Ok(message)
578}
579
580fn new_channel_message(
581    controller: &Name,
582    moderator: &Name,
583    channel: &Name,
584    message_id: u32,
585    auth_provider: &Option<AuthProvider>,
586) -> Result<DataPlaneMessage, ControllerError> {
587    let session_id = generate_session_id(moderator, channel);
588
589    let invite_payload = Some(
590        CommandPayload::builder()
591            .join_request(
592                true,
593                Some(10),
594                Some(Duration::from_secs(1)),
595                Some(channel.clone()),
596            )
597            .as_content(),
598    );
599
600    let mut msg = create_channel_message(
601        controller,
602        moderator,
603        ProtoSessionMessageType::JoinRequest,
604        session_id,
605        message_id,
606        invite_payload,
607        auth_provider,
608    )?;
609
610    msg.insert_metadata(IS_MODERATOR.to_string(), TRUE_VAL.to_string());
611    Ok(msg)
612}
613
614fn delete_channel_message(
615    controller: &Name,
616    moderator: &Name,
617    channel_name: &Name,
618    msg_id: u32,
619    auth_provider: &Option<AuthProvider>,
620) -> Result<DataPlaneMessage, ControllerError> {
621    let session_id = generate_session_id(moderator, channel_name);
622
623    let payload = Some(CommandPayload::builder().leave_request(None).as_content());
624
625    let mut msg = create_channel_message(
626        controller,
627        moderator,
628        ProtoSessionMessageType::LeaveRequest,
629        session_id,
630        msg_id,
631        payload,
632        auth_provider,
633    )?;
634
635    msg.insert_metadata(DELETE_GROUP.to_string(), TRUE_VAL.to_string());
636    Ok(msg)
637}
638
639fn invite_participant_message(
640    controller: &Name,
641    moderator: &Name,
642    participant: &Name,
643    channel_name: &Name,
644    msg_id: u32,
645    auth_provider: &Option<AuthProvider>,
646) -> Result<DataPlaneMessage, ControllerError> {
647    let session_id = generate_session_id(moderator, channel_name);
648
649    let payload = Some(
650        CommandPayload::builder()
651            .discovery_request(Some(participant.clone()))
652            .as_content(),
653    );
654
655    let msg = create_channel_message(
656        controller,
657        moderator,
658        ProtoSessionMessageType::DiscoveryRequest,
659        session_id,
660        msg_id,
661        payload,
662        auth_provider,
663    )?;
664
665    Ok(msg)
666}
667
668fn remove_participant_message(
669    controller: &Name,
670    moderator: &Name,
671    participant: &Name,
672    channel_name: &Name,
673    msg_id: u32,
674    auth_provider: &Option<AuthProvider>,
675) -> Result<DataPlaneMessage, ControllerError> {
676    let session_id = generate_session_id(moderator, channel_name);
677
678    let payload = Some(
679        CommandPayload::builder()
680            .leave_request(Some(participant.clone()))
681            .as_content(),
682    );
683
684    let msg = create_channel_message(
685        controller,
686        moderator,
687        ProtoSessionMessageType::LeaveRequest,
688        session_id,
689        msg_id,
690        payload,
691        auth_provider,
692    )?;
693
694    Ok(msg)
695}
696
697impl ControllerService {
698    fn resolve_connection_by_link_id(&self, link_id: &str) -> Result<Option<u64>, String> {
699        let mut resolved: Option<u64> = None;
700
701        self.inner
702            .message_processor
703            .connection_table()
704            .for_each(|id, conn| {
705                if conn.link_id().as_deref() == Some(link_id) && resolved.is_none() {
706                    resolved = Some(id);
707                }
708            });
709
710        Ok(resolved)
711    }
712
713    fn disconnect_connection_by_link_id(&self, link_id: &str) -> Result<(), String> {
714        if link_id.trim().is_empty() {
715            return Err("link_id cannot be empty".to_string());
716        }
717
718        let conn_id = match self.resolve_connection_by_link_id(link_id)? {
719            Some(id) => id,
720            None => {
721                return Err(format!("Connection with link_id {} not found", link_id));
722            }
723        };
724
725        if let Err(e) = self.inner.message_processor.disconnect(conn_id) {
726            // Best-effort delete: local/control-plane connections can lack config_data.
727            info!(
728                link_id = %link_id,
729                conn_id,
730                error = %e,
731                "Disconnect returned an error; continuing delete flow"
732            );
733        }
734
735        // Remove endpoint->conn mapping for this connection id.
736        self.inner
737            .connections
738            .write()
739            .retain(|_, mapped| *mapped != conn_id);
740
741        info!(link_id = %link_id, conn_id, "Successfully deleted connection by link_id");
742        Ok(())
743    }
744
745    fn resolve_subscription_connection(
746        &self,
747        subscription: &v1::Subscription,
748    ) -> Result<Option<u64>, String> {
749        if let Some(link_id) = &subscription.link_id {
750            let trimmed = link_id.trim();
751            if !trimmed.is_empty() {
752                return self.resolve_connection_by_link_id(trimmed);
753            }
754        }
755
756        Ok(self
757            .inner
758            .connections
759            .read()
760            .get(&subscription.connection_id)
761            .cloned())
762    }
763
764    /// Handle new control messages.
765    async fn handle_new_control_message(
766        &self,
767        msg: ControlMessage,
768        tx: &mpsc::Sender<Result<ControlMessage, Status>>,
769    ) -> Result<(), ControllerError> {
770        match msg.payload {
771            Some(ref payload) => {
772                match payload {
773                    Payload::ConfigCommand(config) => {
774                        let mut connections_status = Vec::new();
775                        let mut subscriptions_status = Vec::new();
776
777                        // Process connections to delete by link_id.
778                        for link_id in &config.connections_to_delete {
779                            info!(link_id = %link_id, "received a connection to delete");
780                            let mut connection_success = true;
781                            let mut connection_error_msg = String::new();
782
783                            if let Err(err) = self.disconnect_connection_by_link_id(link_id) {
784                                connection_success = false;
785                                connection_error_msg = err;
786                            }
787
788                            connections_status.push(v1::ConnectionAck {
789                                connection_id: link_id.clone(),
790                                success: connection_success,
791                                error_msg: connection_error_msg,
792                            });
793                        }
794
795                        // Process connections to create
796                        for conn in &config.connections_to_create {
797                            info!(?conn, "received a connection to create");
798                            let mut connection_success = true;
799                            let mut connection_error_msg = String::new();
800
801                            match serde_json::from_str::<ClientConfig>(&conn.config_data) {
802                                Err(e) => {
803                                    connection_success = false;
804                                    connection_error_msg = format!("Failed to parse config: {}", e);
805                                }
806                                Ok(client_config) => {
807                                    let client_endpoint = &client_config.endpoint;
808                                    let requested_link_id =
809                                        if client_config.link_id.trim().is_empty() {
810                                            String::new()
811                                        } else {
812                                            client_config.link_id.clone()
813                                        };
814                                    let mut existing_conn_for_link_id = false;
815
816                                    if !requested_link_id.is_empty() {
817                                        match self.resolve_connection_by_link_id(&requested_link_id)
818                                        {
819                                            Err(err) => {
820                                                connection_success = false;
821                                                connection_error_msg = err;
822                                            }
823                                            Ok(Some(conn_id)) => {
824                                                existing_conn_for_link_id = true;
825                                                self.inner
826                                                    .connections
827                                                    .write()
828                                                    .insert(client_endpoint.clone(), conn_id);
829                                                info!(
830                                                    endpoint = %client_endpoint,
831                                                    link_id = %requested_link_id,
832                                                    conn_id,
833                                                    "Connection already exists for link_id"
834                                                );
835                                            }
836                                            Ok(None) => {}
837                                        }
838                                    }
839
840                                    if connection_success && !existing_conn_for_link_id {
841                                        // connect to an endpoint if it's not already connected
842                                        if !self
843                                            .inner
844                                            .connections
845                                            .read()
846                                            .contains_key(client_endpoint)
847                                        {
848                                            match self
849                                                .inner
850                                                .message_processor
851                                                .connect(client_config.clone(), None, None)
852                                                .await
853                                            {
854                                                Err(e) => {
855                                                    connection_success = false;
856                                                    connection_error_msg =
857                                                        format!("Connection failed: {}", e);
858                                                }
859                                                Ok(conn_id) => {
860                                                    self.inner
861                                                        .connections
862                                                        .write()
863                                                        .insert(client_endpoint.clone(), conn_id.1);
864                                                    info!(
865                                                        endpoint = %client_endpoint, "Successfully created connection",
866
867                                                    );
868                                                }
869                                            }
870                                        } else {
871                                            info!(endpoint = %client_endpoint, "Connection already exists");
872                                        }
873                                    }
874                                }
875                            }
876
877                            // Add connection status
878                            connections_status.push(v1::ConnectionAck {
879                                connection_id: conn.connection_id.clone(),
880                                success: connection_success,
881                                error_msg: connection_error_msg,
882                            });
883                        }
884
885                        // if the auth_provider is set try to get an identity
886                        let identity_token = match &self.inner.auth_provider {
887                            Some(auth) => auth.get_token()?,
888                            None => String::new(),
889                        };
890
891                        // Process subscriptions to set
892                        for subscription in &config.subscriptions_to_set {
893                            let mut subscription_success = true;
894                            let mut subscription_error_msg = String::new();
895
896                            let conn = self.resolve_subscription_connection(subscription);
897
898                            if let Ok(Some(conn)) = conn {
899                                let source = Name::from_strings([
900                                    subscription.component_0.as_str(),
901                                    subscription.component_1.as_str(),
902                                    subscription.component_2.as_str(),
903                                ])
904                                .with_id(0);
905                                let name = Name::from_strings([
906                                    subscription.component_0.as_str(),
907                                    subscription.component_1.as_str(),
908                                    subscription.component_2.as_str(),
909                                ])
910                                .with_id(subscription.id.unwrap_or(Name::NULL_COMPONENT));
911
912                                let msg = DataPlaneMessage::builder()
913                                    .source(source.clone())
914                                    .destination(name.clone())
915                                    .identity(&identity_token)
916                                    .flags(SlimHeaderFlags::default().with_recv_from(conn))
917                                    .build_subscribe()
918                                    .unwrap();
919
920                                match self.send_subscribe_message_with_ack(msg).await {
921                                    Ok(subscription_id) => {
922                                        // Store the subscription_id for later unsubscription
923                                        self.inner
924                                            .route_subscription_ids
925                                            .lock()
926                                            .insert((name.clone(), conn), subscription_id);
927                                        info!(?subscription, "Successfully created subscription");
928                                    }
929                                    Err(err) => {
930                                        subscription_success = false;
931                                        subscription_error_msg =
932                                            format!("Failed to subscribe: {}", err);
933                                    }
934                                }
935                            } else {
936                                subscription_success = false;
937                                subscription_error_msg = match conn {
938                                    Ok(None) => {
939                                        if let Some(link_id) = &subscription.link_id {
940                                            format!("Connection with link_id {} not found", link_id)
941                                        } else {
942                                            format!(
943                                                "Connection {} not found",
944                                                subscription.connection_id
945                                            )
946                                        }
947                                    }
948                                    Err(err) => err,
949                                    _ => "unknown connection lookup error".to_string(),
950                                };
951                            }
952
953                            // Add subscription status
954                            subscriptions_status.push(v1::SubscriptionAck {
955                                subscription: Some(subscription.clone()),
956                                success: subscription_success,
957                                error_msg: subscription_error_msg,
958                            });
959                        }
960
961                        // Process subscriptions to delete
962                        for subscription in &config.subscriptions_to_delete {
963                            let mut subscription_success = true;
964                            let mut subscription_error_msg = String::new();
965
966                            let conn = self.resolve_subscription_connection(subscription);
967
968                            if let Ok(Some(conn)) = conn {
969                                let source = Name::from_strings([
970                                    subscription.component_0.as_str(),
971                                    subscription.component_1.as_str(),
972                                    subscription.component_2.as_str(),
973                                ])
974                                .with_id(0);
975                                let name = Name::from_strings([
976                                    subscription.component_0.as_str(),
977                                    subscription.component_1.as_str(),
978                                    subscription.component_2.as_str(),
979                                ])
980                                .with_id(subscription.id.unwrap_or(Name::NULL_COMPONENT));
981
982                                let msg = DataPlaneMessage::builder()
983                                    .source(source.clone())
984                                    .destination(name.clone())
985                                    .identity(&identity_token)
986                                    .flags(SlimHeaderFlags::default().with_recv_from(conn))
987                                    .build_unsubscribe()
988                                    .unwrap();
989
990                                let sub_id = self
991                                    .inner
992                                    .route_subscription_ids
993                                    .lock()
994                                    .remove(&(name.clone(), conn));
995                                match sub_id {
996                                    Some(subscription_id) => {
997                                        if let Err(err) = self
998                                            .send_unsubscribe_message_with_ack(msg, subscription_id)
999                                            .await
1000                                        {
1001                                            subscription_success = false;
1002                                            subscription_error_msg =
1003                                                format!("Failed to unsubscribe: {}", err);
1004                                        } else {
1005                                            info!(
1006                                                ?subscription,
1007                                                "Successfully deleted subscription"
1008                                            );
1009                                        }
1010                                    }
1011                                    None => {
1012                                        subscription_success = false;
1013                                        subscription_error_msg = format!(
1014                                            "No subscription_id found for ({}, {})",
1015                                            name, conn
1016                                        );
1017                                    }
1018                                }
1019                            } else {
1020                                subscription_success = false;
1021                                subscription_error_msg = match conn {
1022                                    Ok(None) => {
1023                                        if let Some(link_id) = &subscription.link_id {
1024                                            format!("Connection with link_id {} not found", link_id)
1025                                        } else {
1026                                            format!(
1027                                                "Connection {} not found",
1028                                                subscription.connection_id
1029                                            )
1030                                        }
1031                                    }
1032                                    Err(err) => err,
1033                                    _ => "unknown connection lookup error".to_string(),
1034                                };
1035                            }
1036
1037                            // Add subscription status (for deletion)
1038                            subscriptions_status.push(v1::SubscriptionAck {
1039                                subscription: Some(subscription.clone()),
1040                                success: subscription_success,
1041                                error_msg: subscription_error_msg,
1042                            });
1043                        }
1044
1045                        // Send ConfigurationCommandAck with detailed status information
1046                        let config_ack = v1::ConfigurationCommandAck {
1047                            original_message_id: msg.message_id.clone(),
1048                            connections_status,
1049                            subscriptions_status,
1050                        };
1051
1052                        let reply = ControlMessage {
1053                            message_id: uuid::Uuid::new_v4().to_string(),
1054                            payload: Some(Payload::ConfigCommandAck(config_ack)),
1055                        };
1056
1057                        if let Err(e) = tx.send(Ok(reply)).await {
1058                            error!(error = %e.chain(), "failed to send ConfigurationCommandAck");
1059                        }
1060
1061                        info!(
1062                            connections = %config.connections_to_create.len(),
1063                            connections_to_delete = %config.connections_to_delete.len(),
1064                            subscriptions_to_set = %config.subscriptions_to_set.len(),
1065                            subscriptions_to_del = %config.subscriptions_to_delete.len(),
1066                            "Processed ConfigurationCommand"
1067                        );
1068                    }
1069                    Payload::SubscriptionListRequest(_) => {
1070                        const CHUNK_SIZE: usize = 100;
1071
1072                        let conn_table = self.inner.message_processor.connection_table();
1073                        let mut entries = Vec::new();
1074
1075                        self.inner.message_processor.subscription_table().for_each(
1076                            |name, id, local, remote| {
1077                                let mut entry = SubscriptionEntry {
1078                                    component_0: name.components_strings()[0].to_string(),
1079                                    component_1: name.components_strings()[1].to_string(),
1080                                    component_2: name.components_strings()[2].to_string(),
1081                                    id: Some(id),
1082                                    ..Default::default()
1083                                };
1084
1085                                for &cid in local {
1086                                    entry.local_connections.push(ConnectionEntry {
1087                                        id: cid,
1088                                        connection_type: ConnectionType::Local as i32,
1089                                        config_data: "{}".to_string(),
1090                                        link_id: None,
1091                                        direction: ConnectionDirection::Outgoing as i32,
1092                                    });
1093                                }
1094
1095                                for &cid in remote {
1096                                    if let Some(conn) = conn_table.get(cid) {
1097                                        entry.remote_connections.push(ConnectionEntry {
1098                                            id: cid,
1099                                            connection_type: ConnectionType::Remote as i32,
1100                                            config_data: match conn.config_data() {
1101                                                Some(data) => serde_json::to_string(data)
1102                                                    .unwrap_or_else(|_| "{}".to_string()),
1103                                                None => "{}".to_string(),
1104                                            },
1105                                            link_id: conn.link_id(),
1106                                            direction: if conn.is_outgoing() {
1107                                                ConnectionDirection::Outgoing as i32
1108                                            } else {
1109                                                ConnectionDirection::Incoming as i32
1110                                            },
1111                                        });
1112                                    } else {
1113                                        error!(%cid, "no connection entry for id");
1114                                    }
1115                                }
1116                                entries.push(entry);
1117                            },
1118                        );
1119
1120                        for chunk in entries.chunks(CHUNK_SIZE) {
1121                            let resp = ControlMessage {
1122                                message_id: uuid::Uuid::new_v4().to_string(),
1123                                payload: Some(Payload::SubscriptionListResponse(
1124                                    SubscriptionListResponse {
1125                                        original_message_id: msg.message_id.clone(),
1126                                        entries: chunk.to_vec(),
1127                                    },
1128                                )),
1129                            };
1130
1131                            if let Err(e) = tx.try_send(Ok(resp)) {
1132                                error!(error = %e.chain(), "failed to send subscription batch");
1133                            }
1134                        }
1135                    }
1136                    Payload::ConnectionListRequest(_) => {
1137                        let mut all_entries = Vec::new();
1138                        self.inner
1139                            .message_processor
1140                            .connection_table()
1141                            .for_each(|id, conn| {
1142                                info!(
1143                                    conn_id = id,
1144                                    local_addr = ?conn.local_addr(),
1145                                    remote_addr = ?conn.remote_addr(),
1146                                    is_outgoing = conn.is_outgoing(),
1147                                    link_id = ?conn.link_id(),
1148                                    "connection entry",
1149                                );
1150                                all_entries.push(ConnectionEntry {
1151                                    id,
1152                                    connection_type: ConnectionType::Remote as i32,
1153                                    config_data: match conn.config_data() {
1154                                        Some(data) => serde_json::to_string(data)
1155                                            .unwrap_or_else(|_| "{}".to_string()),
1156                                        None => "{}".to_string(),
1157                                    },
1158                                    link_id: conn.link_id(),
1159                                    direction: if conn.is_outgoing() {
1160                                        ConnectionDirection::Outgoing as i32
1161                                    } else {
1162                                        ConnectionDirection::Incoming as i32
1163                                    },
1164                                });
1165                            });
1166
1167                        const CHUNK_SIZE: usize = 100;
1168                        for chunk in all_entries.chunks(CHUNK_SIZE) {
1169                            let resp = ControlMessage {
1170                                message_id: uuid::Uuid::new_v4().to_string(),
1171                                payload: Some(Payload::ConnectionListResponse(
1172                                    ConnectionListResponse {
1173                                        original_message_id: msg.message_id.clone(),
1174                                        entries: chunk.to_vec(),
1175                                    },
1176                                )),
1177                            };
1178
1179                            if let Err(e) = tx.try_send(Ok(resp)) {
1180                                error!(error = %e.chain(), "failed to send connection list batch");
1181                            }
1182                        }
1183                    }
1184                    Payload::Ack(_ack) => {
1185                        // received an ack, do nothing - this should not happen
1186                    }
1187                    Payload::ConfigCommandAck(_) => {
1188                        // received a config command ack, do nothing - this should not happen
1189                    }
1190                    Payload::SubscriptionListResponse(_) => {
1191                        // received a subscription list response, do nothing - this should not happen
1192                    }
1193                    Payload::ConnectionListResponse(_) => {
1194                        // received a connection list response, do nothing - this should not happen
1195                    }
1196                    Payload::RegisterNodeRequest(_) => {
1197                        error!("received a register node request");
1198                    }
1199                    Payload::RegisterNodeResponse(_) => {
1200                        // received a register node response, do nothing
1201                    }
1202                    Payload::DeregisterNodeRequest(_) => {
1203                        error!("received a deregister node request");
1204                    }
1205                    Payload::DeregisterNodeResponse(_) => {
1206                        // received a deregister node response, do nothing
1207                    }
1208                    Payload::CreateChannelRequest(req) => {
1209                        info!("received a channel create request");
1210
1211                        let mut success = true;
1212                        // Get the first moderator from the list, as we support only one for now
1213                        if let Some(first_moderator) = req.moderators.first() {
1214                            let moderator_name = get_name_from_string(first_moderator)?;
1215                            if !moderator_name.has_id() {
1216                                error!("missing moderator ID");
1217                                success = false;
1218                            } else {
1219                                let channel_name = get_name_from_string(&req.channel_name)?;
1220                                let new_msg_id = rand::random::<u32>();
1221                                let controller_name = self.inner.controller_name.clone();
1222                                let creation_msg = new_channel_message(
1223                                    &controller_name,
1224                                    &moderator_name,
1225                                    &channel_name,
1226                                    new_msg_id,
1227                                    &self.inner.auth_provider,
1228                                )?;
1229
1230                                debug!("send session creation message: {:?}", creation_msg);
1231                                if let Err(e) = self.send_control_message(creation_msg).await {
1232                                    error!(error = %e.chain(), "failed to send channel creation");
1233                                    success = false;
1234                                } else {
1235                                    // create timer for the message
1236                                    debug!(
1237                                        "create timer for message id: {} with type {:?}",
1238                                        new_msg_id,
1239                                        ProtoSessionMessageType::JoinRequest
1240                                    );
1241                                    let timer =
1242                                        self.inner.timer_factory.read().as_ref().map(|factory| {
1243                                            factory.create_and_start_timer(
1244                                                new_msg_id,
1245                                                ProtoSessionMessageType::JoinRequest,
1246                                                None,
1247                                            )
1248                                        });
1249                                    self.inner
1250                                        .message_id_map
1251                                        .write()
1252                                        .insert(new_msg_id, (msg.message_id.clone(), timer));
1253                                }
1254                            }
1255                        } else {
1256                            error!("no moderators specified in create channel request message");
1257                            success = false;
1258                        };
1259
1260                        if !success {
1261                            let ack = Ack {
1262                                original_message_id: msg.message_id.clone(),
1263                                success,
1264                                messages: vec![msg.message_id.clone()],
1265                            };
1266
1267                            let reply = ControlMessage {
1268                                message_id: uuid::Uuid::new_v4().to_string(),
1269                                payload: Some(Payload::Ack(ack)),
1270                            };
1271
1272                            if let Err(e) = tx.send(Ok(reply)).await {
1273                                error!(error = %e.chain(), "failed to send ack");
1274                            }
1275                        }
1276                    }
1277                    Payload::DeleteChannelRequest(req) => {
1278                        info!("received a channel delete request");
1279                        let mut success = true;
1280
1281                        // Get the first moderator from the list, as we support only one for now
1282                        if let Some(first_moderator) = req.moderators.first() {
1283                            let moderator_name = get_name_from_string(first_moderator)?;
1284                            if !moderator_name.has_id() {
1285                                error!("missing moderator ID");
1286                                success = false;
1287                            } else {
1288                                let channel_name = get_name_from_string(&req.channel_name)?;
1289                                let new_msg_id = rand::random::<u32>();
1290                                let controller_name = self.inner.controller_name.clone();
1291                                let delete_msg = delete_channel_message(
1292                                    &controller_name,
1293                                    &moderator_name,
1294                                    &channel_name,
1295                                    new_msg_id,
1296                                    &self.inner.auth_provider,
1297                                )?;
1298
1299                                debug!("Send delete session message: {:?}", delete_msg);
1300                                if let Err(e) = self.send_control_message(delete_msg).await {
1301                                    error!(error = %e.chain(), "failed to send delete channel");
1302                                    success = false;
1303                                } else {
1304                                    // create timer for the message
1305                                    debug!(
1306                                        "create timer for message id: {} with type {:?}",
1307                                        new_msg_id,
1308                                        ProtoSessionMessageType::LeaveRequest
1309                                    );
1310                                    let timer =
1311                                        self.inner.timer_factory.read().as_ref().map(|factory| {
1312                                            factory.create_and_start_timer(
1313                                                new_msg_id,
1314                                                ProtoSessionMessageType::LeaveRequest,
1315                                                None,
1316                                            )
1317                                        });
1318
1319                                    self.inner
1320                                        .message_id_map
1321                                        .write()
1322                                        .insert(new_msg_id, (msg.message_id.clone(), timer));
1323                                }
1324                            }
1325                        } else {
1326                            error!("no moderators specified in delete channel request");
1327                            success = false;
1328                        };
1329
1330                        if !success {
1331                            let ack = Ack {
1332                                original_message_id: msg.message_id.clone(),
1333                                success,
1334                                messages: vec![msg.message_id.clone()],
1335                            };
1336
1337                            let reply = ControlMessage {
1338                                message_id: uuid::Uuid::new_v4().to_string(),
1339                                payload: Some(Payload::Ack(ack)),
1340                            };
1341
1342                            if let Err(e) = tx.send(Ok(reply)).await {
1343                                error!(error = %e.chain(), "failed to send ack");
1344                            }
1345                        }
1346                    }
1347                    Payload::AddParticipantRequest(req) => {
1348                        info!(
1349                            channel_name = %req.channel_name,
1350                            participant_name = %req.participant_name,
1351                            "received a participant add request",
1352                        );
1353
1354                        let mut success = true;
1355
1356                        if let Some(first_moderator) = req.moderators.first() {
1357                            let moderator_name = get_name_from_string(first_moderator)?;
1358                            if !moderator_name.has_id() {
1359                                error!("missing moderator ID");
1360                                success = false;
1361                            } else {
1362                                let channel_name = get_name_from_string(&req.channel_name)?;
1363                                let participant_name = get_name_from_string(&req.participant_name)?;
1364                                let new_msg_id = rand::random::<u32>();
1365                                let controller_name = self.inner.controller_name.clone();
1366                                let invite_msg = invite_participant_message(
1367                                    &controller_name,
1368                                    &moderator_name,
1369                                    &participant_name,
1370                                    &channel_name,
1371                                    new_msg_id,
1372                                    &self.inner.auth_provider,
1373                                )?;
1374
1375                                debug!(?invite_msg, "Send invite participant");
1376
1377                                if let Err(e) = self.send_control_message(invite_msg).await {
1378                                    error!(error = %e.chain(), "failed to send channel creation");
1379                                    success = false;
1380                                } else {
1381                                    // create timer for the message
1382                                    debug!(
1383                                        "create timer for message id: {} with type {:?}",
1384                                        new_msg_id,
1385                                        ProtoSessionMessageType::DiscoveryRequest
1386                                    );
1387                                    let timer =
1388                                        self.inner.timer_factory.read().as_ref().map(|factory| {
1389                                            factory.create_and_start_timer(
1390                                                new_msg_id,
1391                                                ProtoSessionMessageType::DiscoveryRequest,
1392                                                None,
1393                                            )
1394                                        });
1395                                    self.inner
1396                                        .message_id_map
1397                                        .write()
1398                                        .insert(new_msg_id, (msg.message_id.clone(), timer));
1399                                }
1400                            }
1401                        } else {
1402                            error!("no moderators specified in add participant request");
1403                        };
1404
1405                        if !success {
1406                            let ack = Ack {
1407                                original_message_id: msg.message_id.clone(),
1408                                success,
1409                                messages: vec![msg.message_id.clone()],
1410                            };
1411
1412                            let reply = ControlMessage {
1413                                message_id: uuid::Uuid::new_v4().to_string(),
1414                                payload: Some(Payload::Ack(ack)),
1415                            };
1416
1417                            if let Err(e) = tx.send(Ok(reply)).await {
1418                                error!(error = %e.chain(), "failed to send ack");
1419                            }
1420                        }
1421                    }
1422                    Payload::DeleteParticipantRequest(req) => {
1423                        info!("received a participant delete request");
1424
1425                        let mut success = true;
1426
1427                        if let Some(first_moderator) = req.moderators.first() {
1428                            let moderator_name = get_name_from_string(first_moderator)?;
1429                            if !moderator_name.has_id() {
1430                                error!("missing moderator ID");
1431                                success = false;
1432                            } else {
1433                                let channel_name = get_name_from_string(&req.channel_name)?;
1434                                let participant_name = get_name_from_string(&req.participant_name)?;
1435                                let new_msg_id = rand::random::<u32>();
1436                                let controller_name = self.inner.controller_name.clone();
1437                                let remove_msg = remove_participant_message(
1438                                    &controller_name,
1439                                    &moderator_name,
1440                                    &participant_name,
1441                                    &channel_name,
1442                                    new_msg_id,
1443                                    &self.inner.auth_provider,
1444                                )?;
1445
1446                                if let Err(e) = self.send_control_message(remove_msg).await {
1447                                    error!(error = %e.chain(), "failed to send delete participant request");
1448                                    success = false;
1449                                } else {
1450                                    // create timer for the message
1451                                    debug!(
1452                                        "create timer for message id: {} with type {:?}",
1453                                        new_msg_id,
1454                                        ProtoSessionMessageType::LeaveRequest
1455                                    );
1456                                    let timer =
1457                                        self.inner.timer_factory.read().as_ref().map(|factory| {
1458                                            factory.create_and_start_timer(
1459                                                new_msg_id,
1460                                                ProtoSessionMessageType::LeaveRequest,
1461                                                None,
1462                                            )
1463                                        });
1464                                    self.inner
1465                                        .message_id_map
1466                                        .write()
1467                                        .insert(new_msg_id, (msg.message_id.clone(), timer));
1468                                }
1469                            }
1470                        } else {
1471                            error!("no moderators specified in remove participant request");
1472                            success = false;
1473                        };
1474
1475                        if !success {
1476                            let ack = Ack {
1477                                original_message_id: msg.message_id.clone(),
1478                                success,
1479                                messages: vec![msg.message_id.clone()],
1480                            };
1481
1482                            let reply = ControlMessage {
1483                                message_id: uuid::Uuid::new_v4().to_string(),
1484                                payload: Some(Payload::Ack(ack)),
1485                            };
1486
1487                            if let Err(e) = tx.send(Ok(reply)).await {
1488                                error!(error = %e.chain(), "failed to send ack");
1489                            }
1490                        }
1491                    }
1492                    Payload::ListChannelRequest(_) => {}
1493                    Payload::ListChannelResponse(_) => {}
1494                    Payload::ListParticipantsRequest(_) => {}
1495                    Payload::ListParticipantsResponse(_) => {}
1496                }
1497            }
1498            None => {
1499                error!(
1500                    message_id = %msg.message_id,
1501                    "received control message with no payload",
1502                );
1503            }
1504        }
1505
1506        Ok(())
1507    }
1508
1509    async fn handle_subscribe_message(&self, dst: Name, clients: &[ClientConfig]) {
1510        let mut sub_vec = vec![];
1511
1512        let components = dst.components_strings();
1513        let cmd = v1::Subscription {
1514            component_0: components[0].to_string(),
1515            component_1: components[1].to_string(),
1516            component_2: components[2].to_string(),
1517            id: Some(dst.id()),
1518            connection_id: "n/a".to_string(),
1519            node_id: None,
1520            link_id: None,
1521            direction: None,
1522        };
1523
1524        sub_vec.push(cmd);
1525
1526        let ctrl = ControlMessage {
1527            message_id: uuid::Uuid::new_v4().to_string(),
1528            payload: Some(Payload::ConfigCommand(v1::ConfigurationCommand {
1529                connections_to_create: vec![],
1530                connections_to_delete: vec![],
1531                subscriptions_to_set: sub_vec,
1532                subscriptions_to_delete: vec![],
1533            })),
1534        };
1535
1536        return self.send_or_queue_notification(ctrl, clients).await;
1537    }
1538
1539    async fn handle_unsubscribe_message(&self, dst: Name, clients: &[ClientConfig]) {
1540        let mut unsub_vec = vec![];
1541
1542        let components = dst.components_strings();
1543        let cmd = v1::Subscription {
1544            component_0: components[0].to_string(),
1545            component_1: components[1].to_string(),
1546            component_2: components[2].to_string(),
1547            id: Some(dst.id()),
1548            connection_id: "n/a".to_string(),
1549            node_id: None,
1550            link_id: None,
1551            direction: None,
1552        };
1553
1554        unsub_vec.push(cmd);
1555
1556        let ctrl = ControlMessage {
1557            message_id: uuid::Uuid::new_v4().to_string(),
1558            payload: Some(Payload::ConfigCommand(v1::ConfigurationCommand {
1559                connections_to_create: vec![],
1560                connections_to_delete: vec![],
1561                subscriptions_to_set: vec![],
1562                subscriptions_to_delete: unsub_vec,
1563            })),
1564        };
1565
1566        return self.send_or_queue_notification(ctrl, clients).await;
1567    }
1568
1569    /// Send a subscribe message and await the ack. Returns the subscription_id.
1570    async fn send_subscribe_message_with_ack(
1571        &self,
1572        mut msg: DataPlaneMessage,
1573    ) -> Result<u64, String> {
1574        let (ack_id, ack_rx) = self.inner.subscription_manager.register_ack();
1575        msg.set_subscription_id(ack_id);
1576
1577        if let Err(e) = self.send_control_message(msg).await {
1578            self.inner.subscription_manager.cancel_ack(ack_id);
1579            return Err(format!("datapath send error: {}", e.chain()));
1580        }
1581
1582        match ack_rx.await {
1583            Ok(Ok(())) => Ok(ack_id),
1584            Ok(Err(err)) => Err(err.to_string()),
1585            Err(_) => Err("subscription ack channel closed".to_string()),
1586        }
1587    }
1588
1589    /// Send an unsubscribe message with a given subscription_id and await the ack.
1590    async fn send_unsubscribe_message_with_ack(
1591        &self,
1592        mut msg: DataPlaneMessage,
1593        subscription_id: u64,
1594    ) -> Result<(), String> {
1595        let ack_rx = self
1596            .inner
1597            .subscription_manager
1598            .register_ack_with_id(subscription_id);
1599        msg.set_subscription_id(subscription_id);
1600
1601        if let Err(e) = self.send_control_message(msg).await {
1602            self.inner.subscription_manager.cancel_ack(subscription_id);
1603            return Err(format!("datapath send error: {}", e.chain()));
1604        }
1605
1606        match ack_rx.await {
1607            Ok(Ok(())) => Ok(()),
1608            Ok(Err(err)) => Err(err.to_string()),
1609            Err(_) => Err("subscription ack channel closed".to_string()),
1610        }
1611    }
1612
1613    // send an ack back to the control plane. the success field indicates whether the original
1614    // operation was successfully delivered/processed or not.
1615    async fn send_ack_message(&self, msg_id: u32, success: bool, clients: &[ClientConfig]) {
1616        let original_message_id = self.inner.message_id_map.write().remove(&msg_id);
1617        match original_message_id {
1618            Some(entry) => {
1619                debug!("Received GroupAck for message ID: {}", entry.0);
1620                // stop timer and send ack
1621                if let Some(mut timer) = entry.1 {
1622                    timer.stop();
1623                }
1624
1625                let ack = Ack {
1626                    original_message_id: entry.0,
1627                    success,
1628                    messages: vec![msg_id.to_string()],
1629                };
1630
1631                let reply = ControlMessage {
1632                    message_id: uuid::Uuid::new_v4().to_string(),
1633                    payload: Some(Payload::Ack(ack)),
1634                };
1635
1636                self.send_or_queue_notification(reply, clients).await;
1637            }
1638            None => {
1639                debug!("Received GroupAck for unknown message ID: {}", msg_id);
1640            }
1641        }
1642    }
1643
1644    /// Send a control message to SLIM.
1645    async fn send_control_message(&self, msg: DataPlaneMessage) -> Result<(), ControllerError> {
1646        self.inner.tx_slim.send(Ok(msg)).await.map_err(|e| {
1647            error!(error = %e.chain(), "error sending message into datapath");
1648            ControllerError::Datapath(slim_datapath::errors::DataPathError::ConnectionError)
1649        })
1650    }
1651
1652    /// Send notification to control plane or queue it if no connection is available.
1653    async fn send_or_queue_notification(&self, ctrl_msg: ControlMessage, clients: &[ClientConfig]) {
1654        let mut has_active_connection = false;
1655
1656        // Try to send to all active connections
1657        for c in clients {
1658            let tx = match self.inner.tx_channels.read().get(&c.endpoint) {
1659                Some(tx) => tx.clone(),
1660                None => continue,
1661            };
1662
1663            if tx.send(Ok(ctrl_msg.clone())).await.is_ok() {
1664                has_active_connection = true;
1665            } else {
1666                debug!(
1667                    endpoint = %c.endpoint,
1668                    "failed to send notification to control plane"
1669                );
1670            }
1671        }
1672
1673        // If no active connections, queue the notification
1674        if !has_active_connection {
1675            debug!("no active control plane connections, queuing subscription notification");
1676
1677            let mut queue = self.inner.pending_notifications.lock();
1678            if queue.len() >= MAX_QUEUED_NOTIFICATIONS {
1679                // Remove oldest notification to make room for new one
1680                queue.remove(0);
1681                debug!("queue full, removed oldest notification");
1682            }
1683            queue.push(ctrl_msg);
1684        }
1685    }
1686
1687    /// Get a drain watch clone to pass to a task
1688    fn drain_watch(&self) -> Result<drain::Watch, ControllerError> {
1689        self.inner
1690            .drain_watch
1691            .read()
1692            .clone()
1693            .ok_or(ControllerError::AlreadyStopped)
1694    }
1695
1696    /// Send all queued subscription notifications when connection is restored.
1697    async fn send_queued_notifications(
1698        &self,
1699        tx: &mpsc::Sender<Result<ControlMessage, Status>>,
1700        endpoint: &str,
1701    ) {
1702        let notifications = {
1703            let mut queue = self.inner.pending_notifications.lock();
1704            if queue.is_empty() {
1705                return;
1706            }
1707            queue.drain(..).collect::<Vec<_>>()
1708        };
1709
1710        if notifications.is_empty() {
1711            return;
1712        }
1713
1714        debug!(
1715            "sending {} queued subscription notifications to {}",
1716            notifications.len(),
1717            endpoint
1718        );
1719
1720        let mut failed_notifications = Vec::new();
1721        for notification in notifications {
1722            if let Err(e) = tx.send(Ok(notification)).await {
1723                error!(
1724                    error = %e.chain(),
1725                    %endpoint,
1726                    "failed to send queued notification to control plane",
1727                );
1728
1729                // we can unwrap here because we know we sent a Ok(ControlMessage)
1730                failed_notifications.push(e.0.unwrap());
1731            }
1732        }
1733
1734        // Re-queue any failed notifications
1735        if !failed_notifications.is_empty() {
1736            self.inner
1737                .pending_notifications
1738                .lock()
1739                .extend(failed_notifications);
1740        }
1741    }
1742
1743    /// Process the control message stream.
1744    fn process_control_message_stream(
1745        &self,
1746        config: Option<ClientConfig>,
1747        mut stream: impl Stream<Item = Result<ControlMessage, Status>> + Unpin + Send + 'static,
1748        mut timer_rx: Option<mpsc::Receiver<SessionMessage>>,
1749        tx: mpsc::Sender<Result<ControlMessage, Status>>,
1750        cancellation_token: CancellationToken,
1751    ) -> Result<JoinHandle<()>, ControllerError> {
1752        let this = self.clone();
1753        let watch = self.drain_watch()?;
1754        let clients = config.clone();
1755
1756        let handle = tokio::spawn(async move {
1757            // Send a register message to the control plane
1758            let endpoint = config
1759                .as_ref()
1760                .map(|c| c.endpoint.clone())
1761                .unwrap_or_else(|| "unknown".to_string());
1762            info!(%endpoint, "connected to control plane");
1763
1764            let mut retry_connect = false;
1765
1766            let register_request = ControlMessage {
1767                message_id: uuid::Uuid::new_v4().to_string(),
1768                payload: Some(Payload::RegisterNodeRequest(v1::RegisterNodeRequest {
1769                    node_id: this.inner.id.to_string(),
1770                    group_name: this.inner.group_name.clone(),
1771                    connection_details: this.inner.connection_details.clone(),
1772                })),
1773            };
1774
1775            // send register request if client
1776            if config.is_some()
1777                && let Err(e) = tx.send(Ok(register_request)).await
1778            {
1779                error!(error = %e.chain(), "failed to send register request");
1780                return;
1781            }
1782
1783            // TODO; here we should wait for an ack
1784
1785            let mut drain_fut = std::pin::pin!(watch.clone().signaled());
1786
1787            loop {
1788                tokio::select! {
1789                    next = stream.next() => {
1790                        match next {
1791                            Some(Ok(msg)) => {
1792                                if let Err(e) = this.handle_new_control_message(msg, &tx).await {
1793                                    error!(error = %e.chain(), "error processing incoming control message");
1794                                }
1795                            }
1796                            Some(Err(e)) => {
1797                                if let Some(io_err) = Self::match_for_io_error(&e) {
1798                                    if io_err.kind() == std::io::ErrorKind::BrokenPipe {
1799                                        info!("connection closed by peer");
1800                                    } else {
1801                                        // Handle other IO errors (ConnectionAborted, etc.)
1802                                        error!(
1803                                            error = %e.chain(),
1804                                            io_error_kind = ?io_err.kind(),
1805                                            "IO error receiving control messages"
1806                                        );
1807                                    }
1808                                } else {
1809                                    // Handle non-IO errors (e.g., gRPC Canceled, Unavailable, etc.)
1810                                    error!(error = %e.chain(), "error receiving control messages");
1811                                }
1812
1813                                retry_connect = true;
1814                                break;
1815                            }
1816                            None => {
1817                                debug!("end of stream");
1818                                retry_connect = true;
1819                                break;
1820                            }
1821                        }
1822                    }
1823                    Some(session_msg) = async {
1824                        match &mut timer_rx {
1825                            Some(rx) => rx.recv().await,
1826                            None => std::future::pending().await,
1827                        }
1828                    } => {
1829                        match session_msg {
1830                            SessionMessage::TimerFailure { message_id, message_type: _, name: _, timeouts: _} => {
1831                                tracing::info!("got a failure for message id: {}", message_id);
1832                                // if there's a timer the clientconfig is always set
1833                                if let Some(clients) = &clients {
1834                                    this.send_ack_message(message_id, false, std::slice::from_ref(clients)).await;
1835                                }
1836                            }
1837                            _ => {
1838                                error!("unexpected session message received in controller");
1839                            }
1840                        }
1841                    }
1842                    _ = cancellation_token.cancelled() => {
1843                        debug!("shutting down stream on cancellation token");
1844                        break;
1845                    }
1846                    _ = &mut drain_fut => {
1847                        debug!("shutting down stream on drain");
1848                        break;
1849                    }
1850                }
1851            }
1852
1853            info!(%endpoint, "control plane stream closed");
1854
1855            if retry_connect && let Some(config) = config {
1856                info!(%config.endpoint, "retrying connection to control plane");
1857                this.connect(config.clone(), cancellation_token)
1858                    .await
1859                    .map_or_else(
1860                        |e| {
1861                            error!(error = %e.chain(), "failed to reconnect to control plane");
1862                        },
1863                        |tx| {
1864                            info!(%config.endpoint, "reconnected to control plane");
1865
1866                            this.inner
1867                                .tx_channels
1868                                .write()
1869                                .insert(config.endpoint.clone(), tx);
1870                        },
1871                    )
1872            }
1873        });
1874
1875        Ok(handle)
1876    }
1877
1878    /// Connect to the control plane using the provided client configuration.
1879    /// This function attempts to establish a connection to the control plane and returns a sender for control messages.
1880    /// It retries the connection a specified number of times if it fails.
1881    async fn connect(
1882        &self,
1883        config: ClientConfig,
1884        cancellation_token: CancellationToken,
1885    ) -> Result<mpsc::Sender<Result<ControlMessage, Status>>, ControllerError> {
1886        info!(%config.endpoint, "connecting to control plane");
1887
1888        let channel = config.to_channel().await?;
1889
1890        let mut client = ControllerServiceClient::new(channel.clone());
1891        let (tx, rx) = mpsc::channel::<Result<ControlMessage, Status>>(128);
1892        let out_stream = ReceiverStream::new(rx).map(|res| res.expect("mapping error"));
1893        let stream = client
1894            .open_control_channel(Request::new(out_stream))
1895            .await?;
1896
1897        self.send_queued_notifications(&tx, &config.endpoint).await;
1898
1899        let timer_settings = TimerSettings::new(
1900            Duration::from_millis(2000),
1901            None,
1902            Some(0),
1903            TimerType::Constant,
1904        );
1905        let (timer_tx, timer_rx) = mpsc::channel::<SessionMessage>(128);
1906        let timer_factory = TimerFactory::new(timer_settings, timer_tx.clone());
1907        self.inner.timer_factory.write().replace(timer_factory);
1908
1909        // start processing the incoming stream
1910        self.process_control_message_stream(
1911            Some(config),
1912            stream.into_inner(),
1913            Some(timer_rx),
1914            tx.clone(),
1915            cancellation_token.clone(),
1916        )?;
1917
1918        // return the sender
1919        Ok(tx)
1920    }
1921
1922    fn match_for_io_error(err_status: &Status) -> Option<&std::io::Error> {
1923        let mut err: &(dyn std::error::Error + 'static) = err_status;
1924
1925        loop {
1926            if let Some(io_err) = err.downcast_ref::<std::io::Error>() {
1927                return Some(io_err);
1928            }
1929
1930            // h2::Error do not expose std::io::Error with `source()`
1931            // https://github.com/hyperium/h2/pull/462
1932            if let Some(h2_err) = err.downcast_ref::<h2::Error>()
1933                && let Some(io_err) = h2_err.get_io()
1934            {
1935                return Some(io_err);
1936            }
1937
1938            err = err.source()?;
1939        }
1940    }
1941}
1942
1943#[tonic::async_trait]
1944impl GrpcControllerService for ControllerService {
1945    type OpenControlChannelStream =
1946        Pin<Box<dyn Stream<Item = Result<ControlMessage, Status>> + Send + 'static>>;
1947
1948    async fn open_control_channel(
1949        &self,
1950        request: Request<tonic::Streaming<ControlMessage>>,
1951    ) -> Result<Response<Self::OpenControlChannelStream>, Status> {
1952        // Get the remote endpoint from the request metadata
1953        let remote_endpoint = request
1954            .remote_addr()
1955            .map(|addr| addr.to_string())
1956            .unwrap_or_else(|| "unknown".to_string());
1957
1958        let stream = request.into_inner();
1959        let (tx, rx) = mpsc::channel::<Result<ControlMessage, Status>>(128);
1960
1961        let cancellation_token = CancellationToken::new();
1962
1963        // Server-side connections don't initiate operations requiring acks, so no timer channel needed
1964        self.process_control_message_stream(
1965            None,
1966            stream,
1967            None,
1968            tx.clone(),
1969            cancellation_token.clone(),
1970        )
1971        .map_err(|e| {
1972            error!(error = %e.chain(), "error processing control message stream");
1973            Status::unavailable("failed to process control message stream")
1974        })?;
1975
1976        // store the sender in the tx_channels map
1977        self.inner
1978            .tx_channels
1979            .write()
1980            .insert(remote_endpoint.clone(), tx);
1981
1982        // store the cancellation token in the controller service
1983        self.inner
1984            .cancellation_tokens
1985            .write()
1986            .insert(remote_endpoint.clone(), cancellation_token);
1987
1988        let out_stream = ReceiverStream::new(rx);
1989        Ok(Response::new(
1990            Box::pin(out_stream) as Self::OpenControlChannelStream
1991        ))
1992    }
1993}
1994
1995#[cfg(test)]
1996mod tests {
1997    use super::*;
1998    use slim_auth::shared_secret::SharedSecret;
1999    use slim_config::component::id::Kind;
2000    use slim_testing::utils::TEST_VALID_SECRET;
2001    use tracing_test::traced_test;
2002
2003    /// Helper to build a server and client control plane pair with shared-secret auth.
2004    async fn setup_control_planes(
2005        server_endpoint: &str,
2006        server_name: &str,
2007        client_name: &str,
2008    ) -> (ControlPlane, ControlPlane, ClientConfig) {
2009        let id_server = ID::new_with_name(Kind::new("slim").unwrap(), server_name).unwrap();
2010        let id_client = ID::new_with_name(Kind::new("slim").unwrap(), client_name).unwrap();
2011
2012        let server_config = ServerConfig::with_endpoint(server_endpoint)
2013            .with_tls_settings(slim_config::tls::server::TlsServerConfig::insecure());
2014        let client_config = ClientConfig::with_endpoint(&format!("http://{}", server_endpoint))
2015            .with_tls_setting(slim_config::tls::client::TlsClientConfig::insecure());
2016
2017        let message_processor_server = MessageProcessor::new();
2018        let message_processor_client = MessageProcessor::new();
2019
2020        let control_plane_server = ControlPlane::new(ControlPlaneSettings {
2021            id: id_server,
2022            group_name: None,
2023            servers: vec![server_config.clone()],
2024            clients: vec![],
2025            message_processor: Arc::new(message_processor_server),
2026            auth_provider: Some(AuthProvider::SharedSecret(
2027                SharedSecret::new("server", TEST_VALID_SECRET).unwrap(),
2028            )),
2029            auth_verifier: Some(AuthVerifier::SharedSecret(
2030                SharedSecret::new("server", TEST_VALID_SECRET).unwrap(),
2031            )),
2032            connection_details: vec![from_server_config(&server_config)],
2033        });
2034
2035        let control_plane_client = ControlPlane::new(ControlPlaneSettings {
2036            id: id_client,
2037            group_name: None,
2038            servers: vec![],
2039            clients: vec![client_config.clone()],
2040            message_processor: Arc::new(message_processor_client),
2041            auth_provider: Some(AuthProvider::SharedSecret(
2042                SharedSecret::new("client", TEST_VALID_SECRET).unwrap(),
2043            )),
2044            auth_verifier: Some(AuthVerifier::SharedSecret(
2045                SharedSecret::new("client", TEST_VALID_SECRET).unwrap(),
2046            )),
2047            connection_details: vec![],
2048        });
2049
2050        (control_plane_server, control_plane_client, client_config)
2051    }
2052
2053    #[tokio::test]
2054    #[traced_test]
2055    async fn test_end_to_end() {
2056        let (mut control_plane_server, mut control_plane_client, _client_cfg) =
2057            setup_control_planes(
2058                "127.0.0.1:50051",
2059                "test-server-instance",
2060                "test-client-instance",
2061            )
2062            .await;
2063
2064        control_plane_server.run().await.unwrap();
2065        tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
2066        control_plane_client.run().await.unwrap();
2067        tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
2068
2069        assert!(logs_contain("received a register node request"));
2070    }
2071
2072    #[test]
2073    fn test_generate_session_id() {
2074        let moderator_a = Name::from_strings(["Org", "Ns", "Moderator"]).with_id(42);
2075        let moderator_b = Name::from_strings(["Org", "Ns", "Moderator"]).with_id(43); // different id
2076        let channel_x = Name::from_strings(["Org", "Ns", "ChannelX"]).with_id(7);
2077        let channel_y = Name::from_strings(["Org", "Ns", "ChannelY"]).with_id(7); // different last component
2078
2079        let id1 = generate_session_id(&moderator_a, &channel_x);
2080        let id2 = generate_session_id(&moderator_a, &channel_x);
2081        assert_eq!(id1, id2, "hash must be deterministic for same inputs");
2082
2083        let id3 = generate_session_id(&moderator_b, &channel_x);
2084        assert_ne!(id1, id3, "changing moderator id should change session id");
2085
2086        let id4 = generate_session_id(&moderator_a, &channel_y);
2087        assert_ne!(id1, id4, "changing channel name should change session id");
2088
2089        // Ensure moderate spread (not strictly required, but sanity check that values aren't zero)
2090        assert!(
2091            id1 != 0 && id3 != 0 && id4 != 0,
2092            "session ids should not be zero"
2093        );
2094    }
2095
2096    #[tokio::test]
2097    #[traced_test]
2098    async fn test_subscription_notification_queue_drain() {
2099        // Reuse common setup with a different port to avoid clash with other tests.
2100        let (mut control_plane_server, mut control_plane_client, client_config) =
2101            setup_control_planes(
2102                "127.0.0.1:50061",
2103                "queue-drain-server",
2104                "queue-drain-client",
2105            )
2106            .await;
2107
2108        let controller = control_plane_client.controller.clone();
2109        assert_eq!(controller.inner.pending_notifications.lock().len(), 0);
2110
2111        const N: usize = 5;
2112        for i in 0..N {
2113            let ctrl_msg = ControlMessage {
2114                message_id: uuid::Uuid::new_v4().to_string(),
2115                payload: Some(Payload::ConfigCommand(v1::ConfigurationCommand {
2116                    connections_to_create: vec![],
2117                    connections_to_delete: vec![],
2118                    subscriptions_to_set: vec![v1::Subscription {
2119                        component_0: "queued".to_string(),
2120                        component_1: "sub".to_string(),
2121                        component_2: format!("name-{i}"),
2122                        id: Some(i as u64),
2123                        connection_id: "test-conn".to_string(),
2124                        node_id: None,
2125                        link_id: None,
2126                        direction: None,
2127                    }],
2128                    subscriptions_to_delete: vec![],
2129                })),
2130            };
2131            controller
2132                .send_or_queue_notification(ctrl_msg, std::slice::from_ref(&client_config))
2133                .await;
2134        }
2135        assert_eq!(controller.inner.pending_notifications.lock().len(), N);
2136
2137        control_plane_server.run().await.expect("server run failed");
2138        tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
2139        control_plane_client.run().await.expect("client run failed");
2140        tokio::time::sleep(tokio::time::Duration::from_millis(200)).await;
2141
2142        assert_eq!(controller.inner.pending_notifications.lock().len(), 0);
2143        assert!(
2144            logs_contain(&format!("sending {} queued subscription notifications", N)),
2145            "Expected log about sending queued subscription notifications"
2146        );
2147
2148        drop(controller);
2149        drop(control_plane_server);
2150        drop(control_plane_client);
2151    }
2152
2153    #[tokio::test]
2154    #[traced_test]
2155    async fn test_delete_connection_by_link_id_success_ack() {
2156        let (mut control_plane_server, mut control_plane_client, _client_cfg) =
2157            setup_control_planes(
2158                "127.0.0.1:50081",
2159                "delete-linkid-server",
2160                "delete-linkid-client",
2161            )
2162            .await;
2163
2164        control_plane_server.run().await.unwrap();
2165        tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
2166        control_plane_client.run().await.unwrap();
2167        tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
2168
2169        let controller = control_plane_client.controller.clone();
2170        let link_id = "test-delete-link-id".to_string();
2171        let mut assigned = false;
2172        for _ in 0..50 {
2173            controller
2174                .inner
2175                .message_processor
2176                .connection_table()
2177                .for_each(|_, conn| {
2178                    if !assigned {
2179                        conn.set_link_id(link_id.clone());
2180                        assigned = true;
2181                    }
2182                });
2183            if assigned {
2184                break;
2185            }
2186            tokio::time::sleep(tokio::time::Duration::from_millis(20)).await;
2187        }
2188        assert!(
2189            assigned,
2190            "expected at least one connection to assign link_id"
2191        );
2192
2193        let ctrl_msg = ControlMessage {
2194            message_id: uuid::Uuid::new_v4().to_string(),
2195            payload: Some(Payload::ConfigCommand(v1::ConfigurationCommand {
2196                connections_to_create: vec![],
2197                connections_to_delete: vec![link_id.clone()],
2198                subscriptions_to_set: vec![],
2199                subscriptions_to_delete: vec![],
2200            })),
2201        };
2202        let (tx, mut rx) = mpsc::channel(1);
2203        controller
2204            .handle_new_control_message(ctrl_msg, &tx)
2205            .await
2206            .expect("config command must be handled");
2207
2208        let ack_msg = rx
2209            .recv()
2210            .await
2211            .expect("expected ack message")
2212            .expect("ack should be ok");
2213        let ack = match ack_msg.payload {
2214            Some(Payload::ConfigCommandAck(ack)) => ack,
2215            _ => panic!("expected ConfigCommandAck payload"),
2216        };
2217        assert_eq!(ack.connections_status.len(), 1);
2218        assert_eq!(ack.connections_status[0].connection_id, link_id);
2219        assert!(ack.connections_status[0].success);
2220    }
2221
2222    #[tokio::test]
2223    #[traced_test]
2224    async fn test_delete_connection_by_link_id_unknown_fails_ack() {
2225        let (control_plane_server, control_plane_client, _client_cfg) = setup_control_planes(
2226            "127.0.0.1:50082",
2227            "delete-linkid-server-unknown",
2228            "delete-linkid-client-unknown",
2229        )
2230        .await;
2231
2232        let controller = control_plane_client.controller.clone();
2233        let ctrl_msg = ControlMessage {
2234            message_id: uuid::Uuid::new_v4().to_string(),
2235            payload: Some(Payload::ConfigCommand(v1::ConfigurationCommand {
2236                connections_to_create: vec![],
2237                connections_to_delete: vec!["unknown-link-id".to_string()],
2238                subscriptions_to_set: vec![],
2239                subscriptions_to_delete: vec![],
2240            })),
2241        };
2242        let (tx, mut rx) = mpsc::channel(1);
2243        controller
2244            .handle_new_control_message(ctrl_msg, &tx)
2245            .await
2246            .expect("config command must be handled");
2247
2248        let ack_msg = rx
2249            .recv()
2250            .await
2251            .expect("expected ack message")
2252            .expect("ack should be ok");
2253        let ack = match ack_msg.payload {
2254            Some(Payload::ConfigCommandAck(ack)) => ack,
2255            _ => panic!("expected ConfigCommandAck payload"),
2256        };
2257        assert_eq!(ack.connections_status.len(), 1);
2258        assert_eq!(ack.connections_status[0].connection_id, "unknown-link-id");
2259        assert!(!ack.connections_status[0].success);
2260        assert!(ack.connections_status[0].error_msg.contains("not found"));
2261
2262        drop(control_plane_server);
2263    }
2264
2265    #[tokio::test]
2266    #[traced_test]
2267    async fn test_create_connection_with_existing_link_id_reuses_connection_ack() {
2268        let (mut control_plane_server, mut control_plane_client, _client_cfg) =
2269            setup_control_planes(
2270                "127.0.0.1:50083",
2271                "create-linkid-server",
2272                "create-linkid-client",
2273            )
2274            .await;
2275
2276        control_plane_server.run().await.unwrap();
2277        tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
2278        control_plane_client.run().await.unwrap();
2279        tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
2280
2281        let controller = control_plane_client.controller.clone();
2282        let link_id = "test-create-link-id".to_string();
2283        let mut assigned = false;
2284        for _ in 0..50 {
2285            controller
2286                .inner
2287                .message_processor
2288                .connection_table()
2289                .for_each(|_, conn| {
2290                    if !assigned {
2291                        conn.set_link_id(link_id.clone());
2292                        assigned = true;
2293                    }
2294                });
2295            if assigned {
2296                break;
2297            }
2298            tokio::time::sleep(tokio::time::Duration::from_millis(20)).await;
2299        }
2300        assert!(
2301            assigned,
2302            "expected at least one connection to assign link_id"
2303        );
2304
2305        let endpoint = "http://127.0.0.1:59999";
2306        let connection_config = serde_json::json!({
2307            "endpoint": endpoint,
2308            "link_id": link_id
2309        })
2310        .to_string();
2311
2312        let ctrl_msg = ControlMessage {
2313            message_id: uuid::Uuid::new_v4().to_string(),
2314            payload: Some(Payload::ConfigCommand(v1::ConfigurationCommand {
2315                connections_to_create: vec![v1::Connection {
2316                    connection_id: "reuse-existing-link".to_string(),
2317                    config_data: connection_config,
2318                }],
2319                connections_to_delete: vec![],
2320                subscriptions_to_set: vec![],
2321                subscriptions_to_delete: vec![],
2322            })),
2323        };
2324
2325        let (tx, mut rx) = mpsc::channel(1);
2326        controller
2327            .handle_new_control_message(ctrl_msg, &tx)
2328            .await
2329            .expect("config command must be handled");
2330
2331        let ack_msg = rx
2332            .recv()
2333            .await
2334            .expect("expected ack message")
2335            .expect("ack should be ok");
2336        let ack = match ack_msg.payload {
2337            Some(Payload::ConfigCommandAck(ack)) => ack,
2338            _ => panic!("expected ConfigCommandAck payload"),
2339        };
2340        assert_eq!(ack.connections_status.len(), 1);
2341        assert_eq!(
2342            ack.connections_status[0].connection_id,
2343            "reuse-existing-link"
2344        );
2345        assert!(ack.connections_status[0].success);
2346
2347        assert!(
2348            controller.inner.connections.read().contains_key(endpoint),
2349            "expected endpoint to be mapped to reused connection id"
2350        );
2351    }
2352
2353    #[tokio::test]
2354    #[traced_test]
2355    async fn test_subscription_set_unknown_link_id_fails_ack() {
2356        let (control_plane_server, control_plane_client, _client_cfg) = setup_control_planes(
2357            "127.0.0.1:50084",
2358            "sub-linkid-server-unknown",
2359            "sub-linkid-client-unknown",
2360        )
2361        .await;
2362
2363        let controller = control_plane_client.controller.clone();
2364        let ctrl_msg = ControlMessage {
2365            message_id: uuid::Uuid::new_v4().to_string(),
2366            payload: Some(Payload::ConfigCommand(v1::ConfigurationCommand {
2367                connections_to_create: vec![],
2368                connections_to_delete: vec![],
2369                subscriptions_to_set: vec![v1::Subscription {
2370                    component_0: "org".to_string(),
2371                    component_1: "ns".to_string(),
2372                    component_2: "agent".to_string(),
2373                    id: Some(1),
2374                    connection_id: String::new(),
2375                    node_id: None,
2376                    link_id: Some("missing-link-id".to_string()),
2377                    direction: None,
2378                }],
2379                subscriptions_to_delete: vec![],
2380            })),
2381        };
2382        let (tx, mut rx) = mpsc::channel(1);
2383        controller
2384            .handle_new_control_message(ctrl_msg, &tx)
2385            .await
2386            .expect("config command must be handled");
2387
2388        let ack_msg = rx
2389            .recv()
2390            .await
2391            .expect("expected ack message")
2392            .expect("ack should be ok");
2393        let ack = match ack_msg.payload {
2394            Some(Payload::ConfigCommandAck(ack)) => ack,
2395            _ => panic!("expected ConfigCommandAck payload"),
2396        };
2397
2398        assert_eq!(ack.subscriptions_status.len(), 1);
2399        assert!(!ack.subscriptions_status[0].success);
2400        assert!(
2401            ack.subscriptions_status[0]
2402                .error_msg
2403                .contains("Connection with link_id missing-link-id not found")
2404        );
2405
2406        drop(control_plane_server);
2407    }
2408
2409    #[tokio::test]
2410    #[traced_test]
2411    async fn test_create_connection_invalid_config_fails_ack() {
2412        let (_control_plane_server, control_plane_client, _client_cfg) = setup_control_planes(
2413            "127.0.0.1:50085",
2414            "create-invalid-config-server",
2415            "create-invalid-config-client",
2416        )
2417        .await;
2418
2419        let controller = control_plane_client.controller.clone();
2420        let ctrl_msg = ControlMessage {
2421            message_id: uuid::Uuid::new_v4().to_string(),
2422            payload: Some(Payload::ConfigCommand(v1::ConfigurationCommand {
2423                connections_to_create: vec![v1::Connection {
2424                    connection_id: "invalid-config-conn".to_string(),
2425                    config_data: "{invalid-json".to_string(),
2426                }],
2427                connections_to_delete: vec![],
2428                subscriptions_to_set: vec![],
2429                subscriptions_to_delete: vec![],
2430            })),
2431        };
2432        let (tx, mut rx) = mpsc::channel(1);
2433        controller
2434            .handle_new_control_message(ctrl_msg, &tx)
2435            .await
2436            .expect("config command must be handled");
2437
2438        let ack_msg = rx
2439            .recv()
2440            .await
2441            .expect("expected ack message")
2442            .expect("ack should be ok");
2443        let ack = match ack_msg.payload {
2444            Some(Payload::ConfigCommandAck(ack)) => ack,
2445            _ => panic!("expected ConfigCommandAck payload"),
2446        };
2447        assert_eq!(ack.connections_status.len(), 1);
2448        assert!(!ack.connections_status[0].success);
2449        assert!(
2450            ack.connections_status[0]
2451                .error_msg
2452                .contains("Failed to parse config")
2453        );
2454    }
2455
2456    #[tokio::test]
2457    #[traced_test]
2458    async fn test_subscription_delete_unknown_link_id_fails_ack() {
2459        let (control_plane_server, control_plane_client, _client_cfg) = setup_control_planes(
2460            "127.0.0.1:50086",
2461            "sub-del-linkid-server-unknown",
2462            "sub-del-linkid-client-unknown",
2463        )
2464        .await;
2465
2466        let controller = control_plane_client.controller.clone();
2467        let ctrl_msg = ControlMessage {
2468            message_id: uuid::Uuid::new_v4().to_string(),
2469            payload: Some(Payload::ConfigCommand(v1::ConfigurationCommand {
2470                connections_to_create: vec![],
2471                connections_to_delete: vec![],
2472                subscriptions_to_set: vec![],
2473                subscriptions_to_delete: vec![v1::Subscription {
2474                    component_0: "org".to_string(),
2475                    component_1: "ns".to_string(),
2476                    component_2: "agent".to_string(),
2477                    id: Some(1),
2478                    connection_id: String::new(),
2479                    node_id: None,
2480                    link_id: Some("missing-link-id-delete".to_string()),
2481                    direction: None,
2482                }],
2483            })),
2484        };
2485        let (tx, mut rx) = mpsc::channel(1);
2486        controller
2487            .handle_new_control_message(ctrl_msg, &tx)
2488            .await
2489            .expect("config command must be handled");
2490
2491        let ack_msg = rx
2492            .recv()
2493            .await
2494            .expect("expected ack message")
2495            .expect("ack should be ok");
2496        let ack = match ack_msg.payload {
2497            Some(Payload::ConfigCommandAck(ack)) => ack,
2498            _ => panic!("expected ConfigCommandAck payload"),
2499        };
2500
2501        assert_eq!(ack.subscriptions_status.len(), 1);
2502        assert!(!ack.subscriptions_status[0].success);
2503        assert!(
2504            ack.subscriptions_status[0]
2505                .error_msg
2506                .contains("Connection with link_id missing-link-id-delete not found")
2507        );
2508
2509        drop(control_plane_server);
2510    }
2511
2512    #[tokio::test]
2513    #[traced_test]
2514    async fn test_shutdown_drains_resources() {
2515        // Use a unique port to avoid conflicts with other tests.
2516        let (mut control_plane_server, mut control_plane_client, _client_cfg) =
2517            setup_control_planes(
2518                "127.0.0.1:50071",
2519                "shutdown-server-instance",
2520                "shutdown-client-instance",
2521            )
2522            .await;
2523
2524        // Run both ends to populate cancellation tokens.
2525        control_plane_server
2526            .run()
2527            .await
2528            .expect("server should start");
2529        tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
2530        control_plane_client
2531            .run()
2532            .await
2533            .expect("client should start");
2534        tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
2535
2536        // Ensure we have at least one cancellation token (server or client side tasks).
2537        let server_tokens_before = control_plane_server
2538            .controller
2539            .inner
2540            .cancellation_tokens
2541            .read()
2542            .len();
2543        assert!(
2544            server_tokens_before > 0,
2545            "expected server to have active cancellation tokens before shutdown"
2546        );
2547
2548        let client_tokens_before = control_plane_client
2549            .controller
2550            .inner
2551            .cancellation_tokens
2552            .read()
2553            .len();
2554        assert!(
2555            client_tokens_before > 0,
2556            "expected client to have active cancellation tokens before shutdown"
2557        );
2558
2559        // Perform shutdown on both.
2560        control_plane_client
2561            .shutdown()
2562            .await
2563            .expect("client shutdown ok");
2564        control_plane_server
2565            .shutdown()
2566            .await
2567            .expect("server shutdown ok");
2568
2569        // After shutdown, all cancellation tokens should be drained.
2570        let server_tokens_after = control_plane_server
2571            .controller
2572            .inner
2573            .cancellation_tokens
2574            .read()
2575            .len();
2576        assert_eq!(
2577            server_tokens_after, 0,
2578            "expected server cancellation tokens to be drained after shutdown"
2579        );
2580
2581        let client_tokens_after = control_plane_client
2582            .controller
2583            .inner
2584            .cancellation_tokens
2585            .read()
2586            .len();
2587        assert_eq!(
2588            client_tokens_after, 0,
2589            "expected client cancellation tokens to be drained after shutdown"
2590        );
2591
2592        // Second shutdown should error because drain_signal has been taken.
2593        assert!(
2594            control_plane_server.shutdown().await.is_err(),
2595            "second shutdown on server should return an error"
2596        );
2597        assert!(
2598            control_plane_client.shutdown().await.is_err(),
2599            "second shutdown on client should return an error"
2600        );
2601    }
2602
2603    #[tokio::test]
2604    #[traced_test]
2605    async fn test_shutdown_without_run() {
2606        // Build a control plane but do NOT call run()
2607        let (control_plane_server, mut _control_plane_client, _client_cfg) = setup_control_planes(
2608            "127.0.0.1:50072",
2609            "shutdown-no-run-server",
2610            "shutdown-no-run-client",
2611        )
2612        .await;
2613
2614        // No tasks should be registered yet
2615        assert_eq!(
2616            control_plane_server
2617                .controller
2618                .inner
2619                .cancellation_tokens
2620                .read()
2621                .len(),
2622            0,
2623            "expected zero cancellation tokens before shutdown when not run"
2624        );
2625
2626        // Shutdown should still succeed gracefully.
2627        control_plane_server
2628            .shutdown()
2629            .await
2630            .expect("shutdown without prior run should succeed");
2631
2632        // Tokens remain zero.
2633        assert_eq!(
2634            control_plane_server
2635                .controller
2636                .inner
2637                .cancellation_tokens
2638                .read()
2639                .len(),
2640            0,
2641            "expected zero cancellation tokens after shutdown when not run"
2642        );
2643
2644        // Second shutdown should fail.
2645        assert!(
2646            control_plane_server.shutdown().await.is_err(),
2647            "second shutdown should error due to missing drain signal"
2648        );
2649    }
2650}