Skip to main content

slim_controller/
service.rs

1// Copyright AGNTCY Contributors (https://github.com/agntcy)
2// SPDX-License-Identifier: Apache-2.0
3
4use std::collections::{HashMap, VecDeque};
5use std::pin::Pin;
6use std::sync::Arc;
7
8use std::time::Duration;
9
10use display_error_chain::ErrorChainExt;
11use slim_config::component::id::ID;
12use slim_config::server::ServerConfig;
13use slim_session::subscription_manager::SubscriptionManager;
14use tokio::sync::mpsc;
15use tokio::task::JoinHandle;
16use tokio_stream::{Stream, StreamExt, wrappers::ReceiverStream};
17use tokio_util::sync::CancellationToken;
18use tonic::{Request, Response, Status};
19use tracing::{debug, error, info};
20
21use crate::api::proto::api::v1::control_message::Payload;
22use crate::api::proto::api::v1::controller_service_server::ControllerServiceServer;
23use crate::api::proto::api::v1::{
24    self, ConnectionDetails, ConnectionDirection, ConnectionListResponse, ConnectionType,
25    RouteListResponse,
26};
27use crate::api::proto::api::v1::{
28    ConnectionEntry, ControlMessage, RouteEntry,
29    controller_service_client::ControllerServiceClient,
30    controller_service_server::ControllerService as GrpcControllerService,
31};
32use crate::errors::ControllerError;
33use prost_types::Struct;
34use slim_config::client::{ClientConfig, TransportChannel};
35use slim_datapath::api::{
36    MessageType::Subscribe, MessageType::SubscriptionAck as SubscriptionAckType,
37    MessageType::Unsubscribe, ProtoMessage as DataPlaneMessage,
38};
39use slim_datapath::api::{NameId, ProtoName};
40use slim_datapath::message_processing::MessageProcessor;
41use slim_datapath::messages::utils::SlimHeaderFlags;
42use slim_datapath::tables::SubscriptionTable;
43
44type TxChannel = mpsc::Sender<Result<ControlMessage, Status>>;
45type TxChannels = HashMap<String, TxChannel>;
46
47/// Maximum number of queued subscription notifications
48const MAX_QUEUED_NOTIFICATIONS: usize = 1000;
49
50/// Timeout for waiting on a subscription ack from the datapath.
51const SUBSCRIPTION_ACK_TIMEOUT: Duration = Duration::from_secs(30);
52
53/// Settings struct for creating a ControlPlane instance
54#[derive(Clone)]
55pub struct ControlPlaneSettings {
56    /// ID of this SLIM instance
57    pub id: ID,
58    /// Optional group name
59    pub group_name: Option<String>,
60    /// Server configurations
61    pub servers: Vec<ServerConfig>,
62    /// Client configurations
63    pub clients: Vec<ClientConfig>,
64    /// Message processor instance
65    pub message_processor: Arc<MessageProcessor>,
66    /// array of connection details used by the control
67    /// plane to store the connection settings (e.g., TLS settings).
68    pub connection_details: Vec<ConnectionDetails>,
69}
70
71/// Inner structure for the controller service
72/// This structure holds the internal state of the controller service,
73/// including the ID, message processor, connections, and channels.
74/// It is normally wrapped in an Arc to allow shared ownership across multiple threads.
75struct ControllerServiceInternal {
76    /// ID of this SLIM instance
77    id: ID,
78
79    /// optional group name
80    group_name: Option<String>,
81
82    /// underlying message processor
83    message_processor: Arc<MessageProcessor>,
84
85    /// channel to send messages into the datapath
86    tx_slim: mpsc::Sender<Result<DataPlaneMessage, Status>>,
87
88    /// channels to send control messages
89    tx_channels: parking_lot::RwLock<TxChannels>,
90
91    /// cancellation token for graceful shutdown
92    cancellation_tokens: parking_lot::RwLock<HashMap<String, CancellationToken>>,
93
94    /// drain watch channel
95    drain_watch: parking_lot::RwLock<Option<drain::Watch>>,
96
97    /// queue for pending subscription notifications when connections are down
98    pending_notifications: Arc<parking_lot::Mutex<VecDeque<ControlMessage>>>,
99
100    /// Manages pending subscription ack tracking (id generation, registration, resolution).
101    subscription_manager: SubscriptionManager,
102
103    /// connection details used by control plane to store connection settings
104    connection_details: Vec<ConnectionDetails>,
105
106    /// Maps (subscription_name, link_id) → subscription_id for route tracking
107    route_subscription_ids: parking_lot::Mutex<HashMap<(ProtoName, u64), u64>>,
108
109    /// Reverse index: link_id → conn_id for O(1) lookup.
110    /// Populated lazily on first resolve and eagerly on connection create/delete.
111    link_id_to_conn_id: parking_lot::RwLock<HashMap<String, u64>>,
112
113    /// JoinHandles for control-plane stream processing tasks, keyed by endpoint.
114    stream_handles: parking_lot::Mutex<HashMap<String, tokio::task::JoinHandle<()>>>,
115}
116
117#[derive(Clone)]
118struct ControllerService {
119    /// internal service state
120    inner: Arc<ControllerServiceInternal>,
121}
122
123/// The ControlPlane service is the main entry point for the controller service.
124pub struct ControlPlane {
125    /// servers
126    servers: Vec<ServerConfig>,
127
128    /// clients
129    clients: Vec<ClientConfig>,
130
131    /// drain signal channel
132    drain_signal: parking_lot::RwLock<Option<drain::Signal>>,
133
134    /// controller
135    controller: ControllerService,
136
137    /// channel to receive message from the datapath
138    /// to be used in listen_from_data_plane
139    rx_slim_option: Option<mpsc::Receiver<Result<DataPlaneMessage, Status>>>,
140}
141
142/// ControllerServiceInternal implements Drop trait to cancel all running listeners and
143/// clean up resources.
144impl Drop for ControlPlane {
145    fn drop(&mut self) {
146        // cancel all running listeners
147        for (_endpoint, token) in self.controller.inner.cancellation_tokens.write().drain() {
148            token.cancel();
149        }
150    }
151}
152
153pub(crate) fn from_server_config(server_config: &ServerConfig) -> ConnectionDetails {
154    let mut endpoint = server_config.endpoint.clone();
155    let mut external_endpoint = None;
156    let mut spire_socket_path = None;
157    let mut trust_domain = None;
158    let mut remaining_fields = std::collections::BTreeMap::new();
159
160    if let Some(m) = &server_config.metadata {
161        for (k, v) in &m.inner {
162            match k.as_str() {
163                "local_endpoint" => {
164                    if let Some(s) = v.as_str()
165                        && !s.is_empty()
166                    {
167                        endpoint = s.to_string();
168                    }
169                }
170                "external_endpoint" => {
171                    if let Some(s) = v.as_str()
172                        && !s.is_empty()
173                    {
174                        external_endpoint = Some(s.to_string());
175                    }
176                }
177                "spire_socket_path" => {
178                    if let Some(s) = v.as_str()
179                        && !s.is_empty()
180                    {
181                        spire_socket_path = Some(s.to_string());
182                    }
183                }
184                "trust_domain" => {
185                    if let Some(s) = v.as_str()
186                        && !s.is_empty()
187                    {
188                        trust_domain = Some(s.to_string());
189                    }
190                }
191                _ => {
192                    remaining_fields.insert(k.clone(), prost_types::Value::from(v));
193                }
194            }
195        }
196    }
197
198    let spire_mtls = if !server_config.tls_setting.insecure || spire_socket_path.is_some() {
199        Some(v1::connection_details::SpireMtls {
200            socket_path: spire_socket_path.unwrap_or_default(),
201            trust_domain,
202        })
203    } else {
204        None
205    };
206
207    let metadata = if remaining_fields.is_empty() {
208        None
209    } else {
210        Some(Struct {
211            fields: remaining_fields,
212        })
213    };
214
215    ConnectionDetails {
216        endpoint,
217        external_endpoint,
218        spire_mtls,
219        metadata,
220    }
221}
222
223/// ControlPlane implements the service trait for the controller service.
224impl ControlPlane {
225    /// Create a new ControlPlane service instance
226    /// This function initializes the ControlPlane with the given ID, servers, clients, and message processor.
227    /// It also sets up the internal state, including the connections and channels.
228    /// # Arguments
229    /// * `id` - The ID of the SLIM instance.
230    /// * `servers` - A vector of server configurations.
231    /// * `clients` - A vector of client configurations.
232    /// * `drain_rx` - A drain watch channel for graceful shutdown.
233    /// * `message_processor` - An Arc to the message processor instance.
234    /// * `pubsub_servers` - A slice of server configurations for pub/sub connections.
235    /// # Returns
236    /// A new instance of ControlPlane.
237    pub fn new(config: ControlPlaneSettings) -> Self {
238        // create local connection with the message processor
239        let (_, tx_slim, rx_slim) = config
240            .message_processor
241            .register_local_connection(true)
242            .unwrap();
243
244        let (signal, watch) = drain::channel();
245
246        ControlPlane {
247            servers: config.servers,
248            clients: config.clients,
249            controller: ControllerService {
250                inner: Arc::new(ControllerServiceInternal {
251                    id: config.id,
252                    group_name: config.group_name,
253                    message_processor: config.message_processor,
254                    subscription_manager: SubscriptionManager::new(tx_slim.clone()),
255                    tx_slim,
256                    tx_channels: parking_lot::RwLock::new(HashMap::new()),
257                    cancellation_tokens: parking_lot::RwLock::new(HashMap::new()),
258                    drain_watch: parking_lot::RwLock::new(Some(watch)),
259                    pending_notifications: Arc::new(parking_lot::Mutex::new(VecDeque::new())),
260                    connection_details: config.connection_details,
261                    route_subscription_ids: parking_lot::Mutex::new(HashMap::new()),
262                    link_id_to_conn_id: parking_lot::RwLock::new(HashMap::new()),
263                    stream_handles: parking_lot::Mutex::new(HashMap::new()),
264                }),
265            },
266            drain_signal: parking_lot::RwLock::new(Some(signal)),
267            rx_slim_option: Some(rx_slim),
268        }
269    }
270
271    /// Take an existing ControlPlane instance and return a new one with the provided clients.
272    pub fn with_clients(mut self, clients: Vec<ClientConfig>) -> Self {
273        self.clients = clients;
274        self
275    }
276
277    /// Take an existing ControlPlane instance and return a new one with the provided servers.
278    pub fn with_servers(mut self, servers: Vec<ServerConfig>) -> Self {
279        self.servers = servers;
280        self
281    }
282
283    /// Run the clients and servers of the ControlPlane service.
284    /// This function starts all the servers and clients defined in the ControlPlane.
285    /// # Returns
286    /// A Result indicating success or failure of the operation.
287    /// # Errors
288    /// If there is an error starting any of the servers or clients, it will return a ControllerError.
289    pub async fn run(&mut self) -> Result<(), ControllerError> {
290        let rx = self
291            .rx_slim_option
292            .take()
293            .ok_or(ControllerError::AlreadyStarted)?;
294
295        // Collect servers to avoid borrowing self both mutably and immutably
296        let servers = self.servers.clone();
297        let clients = self.clients.clone();
298
299        // run all servers
300        for server in servers {
301            self.run_server(server).await?;
302        }
303
304        // run all clients
305        for client in clients {
306            self.run_client(client).await?;
307        }
308
309        self.listen_from_data_plane(rx).await?;
310
311        Ok(())
312    }
313
314    pub async fn deregister(&self) -> Result<(), ControllerError> {
315        let node_id = self.controller.inner.id.to_string();
316        let deregister_msg = ControlMessage {
317            message_id: uuid::Uuid::new_v4().to_string(),
318            payload: Some(Payload::DeregisterNodeRequest(v1::DeregisterNodeRequest {
319                node: Some(v1::Node { id: node_id }),
320            })),
321        };
322        let channels: Vec<(String, TxChannel)> = self
323            .controller
324            .inner
325            .tx_channels
326            .read()
327            .iter()
328            .map(|(ep, tx)| (ep.clone(), tx.clone()))
329            .collect();
330        for (endpoint, tx) in channels {
331            if let Err(e) = tx.send(Ok(deregister_msg.clone())).await {
332                error!(%endpoint, error = %e, "failed to send deregister request");
333            }
334        }
335        Ok(())
336    }
337
338    pub async fn shutdown(&self) -> Result<(), ControllerError> {
339        // Get signal drain
340        let signal = self
341            .drain_signal
342            .write()
343            .take()
344            .ok_or(ControllerError::AlreadyStopped)?;
345
346        // Stop everything using the cancellation tokens
347        self.controller
348            .inner
349            .cancellation_tokens
350            .write()
351            .drain()
352            .for_each(|(endpoint, token)| {
353                info!(%endpoint, "stopping");
354                token.cancel();
355            });
356
357        // Drop watch channel
358        self.controller.inner.drain_watch.write().take();
359
360        // Wait for drain to complete
361        signal.drain().await;
362
363        Ok(())
364    }
365
366    async fn listen_from_data_plane(
367        &mut self,
368        mut rx: mpsc::Receiver<Result<DataPlaneMessage, Status>>,
369    ) -> Result<(), ControllerError> {
370        let cancellation_token = CancellationToken::new();
371        let cancellation_token_clone = cancellation_token.clone();
372
373        self.controller
374            .inner
375            .cancellation_tokens
376            .write()
377            .insert("DATA_PLANE".to_string(), cancellation_token_clone);
378
379        let clients = self.clients.clone();
380        let controller = self.controller.clone();
381
382        // Get a drain watch clone
383        let watch = self.controller.drain_watch()?;
384
385        debug!("Starting data plane listener");
386        tokio::spawn(async move {
387            let mut drain_fut = std::pin::pin!(watch.signaled());
388            loop {
389                tokio::select! {
390                    next = rx.recv() => {
391                        match next {
392                            Some(res) => {
393                                match res {
394                                    Ok(msg) => {
395                                        debug!("Send sub/unsub to control plane for message: {:?}", msg);
396                                        match msg.get_type() {
397                                            Subscribe(_) => {
398                                                controller.handle_subscribe_message(msg.get_dst(), &clients).await;
399                                            }
400                                            Unsubscribe(_) => {
401                                                controller.handle_unsubscribe_message(msg.get_dst(), &clients).await;
402                                            }
403                                            SubscriptionAckType(_) => {
404                                                controller.inner.subscription_manager.resolve_ack(msg.get_subscription_ack());
405                                            }
406                                            _ => {
407                                                debug!("Ignoring unexpected message type from dataplane: {:?}", msg.get_type());
408                                            }
409                                        }
410                                    }
411                                    Err(e) => {
412                                        error!(error = %e.chain(), "received error from the data plane");
413                                        continue;
414                                    }
415                                }
416                            }
417                            None => {
418                                debug!("Data plane receiver channel closed.");
419                                break;
420                            }
421                        }
422                    }
423                    _ = cancellation_token.cancelled() => {
424                        debug!("shutting down stream on cancellation token");
425                        break;
426                    }
427                    _ = &mut drain_fut => {
428                        debug!("shutting down stream on drain");
429                        break;
430                    }
431                }
432            }
433        });
434        Ok(())
435    }
436
437    /// Stop the ControlPlane service.
438    /// This function stops all running listeners and cancels any ongoing operations.
439    /// It cleans up the internal state and ensures that all resources are released properly.
440    pub fn stop(&mut self) {
441        info!("stopping controller service");
442
443        // cancel all running listeners
444        for (endpoint, token) in self.controller.inner.cancellation_tokens.write().drain() {
445            info!(%endpoint, "stopping");
446            token.cancel();
447        }
448    }
449
450    /// Run a client configuration.
451    /// This function connects to the control plane using the provided client configuration.
452    /// It checks if the client is already running and if not, it starts a new connection.
453    async fn run_client(&mut self, client: ClientConfig) -> Result<(), ControllerError> {
454        if self
455            .controller
456            .inner
457            .cancellation_tokens
458            .read()
459            .contains_key(&client.endpoint)
460        {
461            return Err(ControllerError::ClientAlreadyRunning(client.endpoint));
462        }
463
464        let cancellation_token = CancellationToken::new();
465
466        let tx = self
467            .controller
468            .connect(client.clone(), cancellation_token.clone())
469            .await?;
470
471        // Store the cancellation token in the controller service
472        self.controller
473            .inner
474            .cancellation_tokens
475            .write()
476            .insert(client.endpoint.clone(), cancellation_token);
477
478        // Store the sender in the tx_channels map
479        self.controller
480            .inner
481            .tx_channels
482            .write()
483            .insert(client.endpoint.clone(), tx);
484
485        // return the sender for control messages
486        Ok(())
487    }
488
489    /// Run a server configuration.
490    /// This function starts a server using the provided server configuration.
491    /// It checks if the server is already running and if not, it starts a new server.
492    pub async fn run_server(&mut self, config: ServerConfig) -> Result<(), ControllerError> {
493        // Check if the server is already running
494        if self
495            .controller
496            .inner
497            .cancellation_tokens
498            .read()
499            .contains_key(&config.endpoint)
500        {
501            error!(endpoint = config.endpoint, "server is already running",);
502            return Err(ControllerError::ServerAlreadyRunning(config.endpoint));
503        }
504
505        let token = config
506            .run_grpc_server(
507                &[ControllerServiceServer::new(self.controller.clone())],
508                self.controller.drain_watch()?,
509            )
510            .await?;
511
512        // Store the cancellation token in the controller service
513        self.controller
514            .inner
515            .cancellation_tokens
516            .write()
517            .insert(config.endpoint.clone(), token.clone());
518
519        info!(%config.endpoint, "started controlplane server");
520
521        Ok(())
522    }
523}
524
525impl ControllerService {
526    fn resolve_connection_by_link_id(&self, link_id: &str) -> Result<Option<u64>, String> {
527        let cached = self.inner.link_id_to_conn_id.read().get(link_id).copied();
528        if let Some(conn_id) = cached {
529            if self
530                .inner
531                .message_processor
532                .connection_table()
533                .get(conn_id)
534                .is_some_and(|conn| conn.link_id().as_deref() == Some(link_id))
535            {
536                return Ok(Some(conn_id));
537            }
538            self.inner.link_id_to_conn_id.write().remove(link_id);
539        }
540
541        let mut resolved: Option<u64> = None;
542        self.inner
543            .message_processor
544            .connection_table()
545            .for_each(|id, conn| {
546                if conn.link_id().as_deref() == Some(link_id) && resolved.is_none() {
547                    resolved = Some(id);
548                }
549            });
550
551        if let Some(conn_id) = resolved {
552            self.inner
553                .link_id_to_conn_id
554                .write()
555                .insert(link_id.to_string(), conn_id);
556        }
557
558        Ok(resolved)
559    }
560
561    fn disconnect_connection_by_link_id(&self, link_id: &str) -> Result<(), String> {
562        if link_id.trim().is_empty() {
563            return Err("link_id cannot be empty".to_string());
564        }
565
566        let conn_id = match self.resolve_connection_by_link_id(link_id)? {
567            Some(id) => id,
568            None => {
569                return Err(format!("Connection with link_id {} not found", link_id));
570            }
571        };
572
573        if let Err(e) = self.inner.message_processor.disconnect(conn_id) {
574            // Best-effort delete: local/control-plane connections can lack config_data.
575            info!(
576                link_id = %link_id,
577                conn_id,
578                error = %e,
579                "Disconnect returned an error; continuing delete flow"
580            );
581        }
582
583        self.inner.link_id_to_conn_id.write().remove(link_id);
584        self.inner
585            .route_subscription_ids
586            .lock()
587            .retain(|(_name, cid), _| *cid != conn_id);
588
589        info!(link_id = %link_id, conn_id, "Successfully deleted connection by link_id");
590        Ok(())
591    }
592
593    fn resolve_route_connection(&self, route: &v1::Route) -> Result<Option<u64>, String> {
594        if let Some(link_id) = &route.link_id {
595            let trimmed = link_id.trim();
596            if !trimmed.is_empty() {
597                return self.resolve_connection_by_link_id(trimmed);
598            }
599        }
600
601        Ok(None)
602    }
603
604    async fn diff_connections(
605        &self,
606        desired_connections: &[v1::Connection],
607    ) -> Vec<v1::ConnectionAck> {
608        let mut connections_status = Vec::new();
609
610        let desired_link_ids: std::collections::HashSet<String> = desired_connections
611            .iter()
612            .map(|c| c.link_id.clone())
613            .collect();
614
615        let mut live_outgoing_link_ids: Vec<String> = Vec::new();
616        self.inner
617            .message_processor
618            .connection_table()
619            .for_each(|_id, conn| {
620                if conn.is_outgoing()
621                    && let Some(lid) = conn.link_id()
622                    && !lid.is_empty()
623                {
624                    live_outgoing_link_ids.push(lid);
625                }
626            });
627
628        for link_id in &live_outgoing_link_ids {
629            if desired_link_ids.contains(link_id) {
630                continue;
631            }
632            info!(link_id = %link_id, "desired state: removing connection");
633            let mut success = true;
634            let mut error_msg = String::new();
635            if let Err(err) = self.disconnect_connection_by_link_id(link_id) {
636                success = false;
637                error_msg = err;
638            }
639            connections_status.push(v1::ConnectionAck {
640                link_id: link_id.clone(),
641                success,
642                error_msg,
643            });
644        }
645
646        for conn in desired_connections {
647            let link_id = &conn.link_id;
648            if link_id.is_empty() {
649                continue;
650            }
651
652            let already_exists = match self.resolve_connection_by_link_id(link_id) {
653                Ok(Some(_)) => true,
654                Ok(None) => false,
655                Err(err) => {
656                    connections_status.push(v1::ConnectionAck {
657                        link_id: link_id.clone(),
658                        success: false,
659                        error_msg: err,
660                    });
661                    continue;
662                }
663            };
664
665            if already_exists {
666                connections_status.push(v1::ConnectionAck {
667                    link_id: link_id.clone(),
668                    success: true,
669                    error_msg: String::new(),
670                });
671                continue;
672            }
673
674            info!(?conn, "desired state: creating connection");
675            let mut success = true;
676            let mut error_msg = String::new();
677
678            match serde_json::from_str::<ClientConfig>(&conn.config_data) {
679                Err(e) => {
680                    success = false;
681                    error_msg = format!("Failed to parse config: {}", e);
682                }
683                Ok(client_config) => {
684                    match self
685                        .inner
686                        .message_processor
687                        .connect(client_config.clone(), None, None)
688                        .await
689                    {
690                        Err(e) => {
691                            success = false;
692                            error_msg = format!("Connection failed: {}", e);
693                        }
694                        Ok(conn_id) => {
695                            let requested_link_id = if client_config.link_id.trim().is_empty() {
696                                String::new()
697                            } else {
698                                client_config.link_id.clone()
699                            };
700                            if !requested_link_id.is_empty() {
701                                self.inner
702                                    .link_id_to_conn_id
703                                    .write()
704                                    .insert(requested_link_id, conn_id.1);
705                            }
706                            info!(
707                                link_id = %link_id,
708                                "Successfully created connection"
709                            );
710                        }
711                    }
712                }
713            }
714
715            connections_status.push(v1::ConnectionAck {
716                link_id: link_id.clone(),
717                success,
718                error_msg,
719            });
720        }
721
722        connections_status
723    }
724
725    fn resolve_desired_routes<'a>(
726        &self,
727        desired_routes: &'a [v1::Route],
728    ) -> (HashMap<(ProtoName, u64), &'a v1::Route>, Vec<v1::RouteAck>) {
729        type SubKey = (ProtoName, u64);
730        let mut desired_subs: HashMap<SubKey, &v1::Route> = HashMap::new();
731        let mut failures: Vec<v1::RouteAck> = Vec::new();
732
733        for sub in desired_routes {
734            match self.resolve_route_connection(sub) {
735                Ok(Some(conn_id)) => {
736                    let name = sub.name.clone().unwrap();
737                    desired_subs.insert((name, conn_id), sub);
738                }
739                Ok(None) => {
740                    failures.push(v1::RouteAck {
741                        route: Some(sub.clone()),
742                        success: false,
743                        error_msg: "connection not found".to_string(),
744                    });
745                }
746                Err(err) => {
747                    failures.push(v1::RouteAck {
748                        route: Some(sub.clone()),
749                        success: false,
750                        error_msg: err,
751                    });
752                }
753            }
754        }
755
756        (desired_subs, failures)
757    }
758
759    async fn delete_stale_subscriptions(
760        &self,
761        desired_subs: &HashMap<(ProtoName, u64), &v1::Route>,
762    ) -> Vec<v1::RouteAck> {
763        let active_subs: Vec<((ProtoName, u64), u64)> = self
764            .inner
765            .route_subscription_ids
766            .lock()
767            .iter()
768            .map(|((name, conn_id), sub_id)| ((name.clone(), *conn_id), *sub_id))
769            .collect();
770
771        let stale: Vec<_> = active_subs
772            .into_iter()
773            .filter(|((name, conn_id), _)| !desired_subs.contains_key(&(name.clone(), *conn_id)))
774            .collect();
775
776        let futs = stale.iter().map(|((name, conn_id), sub_id)| {
777            let name = name.clone();
778            let conn_id = *conn_id;
779            let sub_id = *sub_id;
780            async move {
781                let conn_alive = self
782                    .inner
783                    .message_processor
784                    .connection_table()
785                    .get(conn_id)
786                    .is_some();
787
788                let (success, error_msg) = if conn_alive {
789                    let source = name.clone().with_id(0);
790                    let unsub_msg = DataPlaneMessage::builder()
791                        .source(source)
792                        .destination(name.clone())
793                        .identity("")
794                        .flags(SlimHeaderFlags::default().with_recv_from(conn_id))
795                        .build_unsubscribe()
796                        .unwrap();
797
798                    match self
799                        .send_unsubscribe_message_with_ack(unsub_msg, sub_id)
800                        .await
801                    {
802                        Ok(()) => (true, String::new()),
803                        Err(err) => (false, format!("Failed to unsubscribe: {}", err)),
804                    }
805                } else {
806                    (true, String::new())
807                };
808
809                (name, conn_id, success, error_msg)
810            }
811        });
812
813        let results = futures::future::join_all(futs).await;
814
815        let mut routes_status = Vec::with_capacity(results.len());
816        for (name, conn_id, success, error_msg) in results {
817            if success {
818                self.inner
819                    .route_subscription_ids
820                    .lock()
821                    .remove(&(name.clone(), conn_id));
822            }
823
824            routes_status.push(v1::RouteAck {
825                route: Some(v1::Route {
826                    name: Some(name.clone()),
827                    link_id: None,
828                    direction: None,
829                }),
830                success,
831                error_msg,
832            });
833        }
834
835        routes_status
836    }
837
838    async fn create_new_subscriptions(
839        &self,
840        desired_subs: &HashMap<(ProtoName, u64), &v1::Route>,
841    ) -> Vec<v1::RouteAck> {
842        let mut routes_status = Vec::new();
843
844        let to_create: Vec<((ProtoName, u64), v1::Route)> = desired_subs
845            .iter()
846            .filter(|((name, conn_id), _)| {
847                !self
848                    .inner
849                    .route_subscription_ids
850                    .lock()
851                    .contains_key(&(name.clone(), *conn_id))
852            })
853            .map(|((name, conn_id), sub)| ((name.clone(), *conn_id), (*sub).clone()))
854            .collect();
855
856        // Report already-active subscriptions as success.
857        for ((name, conn_id), sub) in desired_subs {
858            let dominated = to_create
859                .iter()
860                .any(|((n, c), _)| n == name && *c == *conn_id);
861            if !dominated {
862                routes_status.push(v1::RouteAck {
863                    route: Some((*sub).clone()),
864                    success: true,
865                    error_msg: String::new(),
866                });
867            }
868        }
869
870        let futs = to_create.iter().map(|((name, conn_id), sub)| {
871            let name = name.clone();
872            let conn_id = *conn_id;
873            let sub = sub.clone();
874            async move {
875                let source = name.clone().with_id(0);
876                let sub_msg = DataPlaneMessage::builder()
877                    .source(source)
878                    .destination(name.clone())
879                    .identity("")
880                    .flags(SlimHeaderFlags::default().with_recv_from(conn_id))
881                    .build_subscribe()
882                    .unwrap();
883
884                let result = self.send_subscribe_message_with_ack(sub_msg).await;
885                (name, conn_id, sub, result)
886            }
887        });
888
889        let results = futures::future::join_all(futs).await;
890
891        for (name, conn_id, sub, result) in results {
892            let (success, error_msg) = match result {
893                Ok(subscription_id) => {
894                    self.inner
895                        .route_subscription_ids
896                        .lock()
897                        .insert((name, conn_id), subscription_id);
898                    info!(?sub, "desired state: created route");
899                    (true, String::new())
900                }
901                Err(err) => (false, format!("Failed to subscribe: {}", err)),
902            };
903
904            routes_status.push(v1::RouteAck {
905                route: Some(sub),
906                success,
907                error_msg,
908            });
909        }
910
911        routes_status
912    }
913
914    /// Handle new control messages.
915    async fn handle_new_control_message(
916        &self,
917        msg: ControlMessage,
918        tx: &mpsc::Sender<Result<ControlMessage, Status>>,
919    ) -> Result<(), ControllerError> {
920        match msg.payload {
921            Some(ref payload) => {
922                match payload {
923                    Payload::ConfigCommand(config) => {
924                        let (connections_status, routes_status) = if config.reconcile {
925                            let connections_status =
926                                self.diff_connections(&config.connections_to_create).await;
927                            let (desired_subs, mut routes_status) =
928                                self.resolve_desired_routes(&config.routes_to_set);
929                            routes_status
930                                .extend(self.delete_stale_subscriptions(&desired_subs).await);
931                            routes_status
932                                .extend(self.create_new_subscriptions(&desired_subs).await);
933                            (connections_status, routes_status)
934                        } else {
935                            let mut connections_status = Vec::new();
936                            let mut routes_status = Vec::new();
937
938                            // Process connections to delete by link_id.
939                            for link_id in &config.connections_to_delete {
940                                info!(link_id = %link_id, "received a connection to delete");
941                                let mut connection_success = true;
942                                let mut connection_error_msg = String::new();
943
944                                if let Err(err) = self.disconnect_connection_by_link_id(link_id) {
945                                    connection_success = false;
946                                    connection_error_msg = err;
947                                }
948
949                                connections_status.push(v1::ConnectionAck {
950                                    link_id: link_id.clone(),
951                                    success: connection_success,
952                                    error_msg: connection_error_msg,
953                                });
954                            }
955
956                            // Process connections to create
957                            for conn in &config.connections_to_create {
958                                info!(?conn, "received a connection to create");
959                                let mut connection_success = true;
960                                let mut connection_error_msg = String::new();
961
962                                match serde_json::from_str::<ClientConfig>(&conn.config_data) {
963                                    Err(e) => {
964                                        connection_success = false;
965                                        connection_error_msg =
966                                            format!("Failed to parse config: {}", e);
967                                    }
968                                    Ok(client_config) => {
969                                        let client_endpoint = &client_config.endpoint;
970                                        let requested_link_id =
971                                            if client_config.link_id.trim().is_empty() {
972                                                String::new()
973                                            } else {
974                                                client_config.link_id.clone()
975                                            };
976                                        let mut existing_conn_for_link_id = false;
977
978                                        if !requested_link_id.is_empty() {
979                                            match self
980                                                .resolve_connection_by_link_id(&requested_link_id)
981                                            {
982                                                Err(err) => {
983                                                    connection_success = false;
984                                                    connection_error_msg = err;
985                                                }
986                                                Ok(Some(conn_id)) => {
987                                                    existing_conn_for_link_id = true;
988                                                    self.inner
989                                                        .link_id_to_conn_id
990                                                        .write()
991                                                        .insert(requested_link_id.clone(), conn_id);
992                                                    info!(
993                                                        link_id = %requested_link_id,
994                                                        conn_id,
995                                                        "Connection already exists for link_id"
996                                                    );
997                                                }
998                                                Ok(None) => {}
999                                            }
1000                                        }
1001
1002                                        if connection_success && !existing_conn_for_link_id {
1003                                            match self
1004                                                .inner
1005                                                .message_processor
1006                                                .connect(client_config.clone(), None, None)
1007                                                .await
1008                                            {
1009                                                Err(e) => {
1010                                                    connection_success = false;
1011                                                    connection_error_msg =
1012                                                        format!("Connection failed: {}", e);
1013                                                }
1014                                                Ok(conn_id) => {
1015                                                    if !requested_link_id.is_empty() {
1016                                                        self.inner
1017                                                            .link_id_to_conn_id
1018                                                            .write()
1019                                                            .insert(
1020                                                                requested_link_id.clone(),
1021                                                                conn_id.1,
1022                                                            );
1023                                                    }
1024                                                    info!(
1025                                                        endpoint = %client_endpoint, "Successfully created connection",
1026                                                    );
1027                                                }
1028                                            }
1029                                        }
1030                                    }
1031                                }
1032
1033                                // Add connection status
1034                                connections_status.push(v1::ConnectionAck {
1035                                    link_id: conn.link_id.clone(),
1036                                    success: connection_success,
1037                                    error_msg: connection_error_msg,
1038                                });
1039                            }
1040
1041                            // Process routes to set
1042                            for route in &config.routes_to_set {
1043                                let mut route_success = true;
1044                                let mut route_error_msg = String::new();
1045
1046                                let conn = self.resolve_route_connection(route);
1047
1048                                if let Ok(Some(conn)) = conn {
1049                                    let name = route.name.clone().unwrap();
1050                                    let source = name.clone().with_id(NameId::NULL_COMPONENT);
1051
1052                                    let msg = DataPlaneMessage::builder()
1053                                        .source(source.clone())
1054                                        .destination(name.clone())
1055                                        .identity("")
1056                                        .flags(SlimHeaderFlags::default().with_recv_from(conn))
1057                                        .build_subscribe()
1058                                        .unwrap();
1059
1060                                    match self.send_subscribe_message_with_ack(msg).await {
1061                                        Ok(subscription_id) => {
1062                                            // Store the subscription_id for later unsubscription
1063                                            self.inner
1064                                                .route_subscription_ids
1065                                                .lock()
1066                                                .insert((name.clone(), conn), subscription_id);
1067                                            info!(?route, "Successfully created route");
1068                                        }
1069                                        Err(err) => {
1070                                            route_success = false;
1071                                            route_error_msg =
1072                                                format!("Failed to subscribe: {}", err);
1073                                        }
1074                                    }
1075                                } else {
1076                                    route_success = false;
1077                                    route_error_msg = match conn {
1078                                        Ok(None) => {
1079                                            format!(
1080                                                "Connection with link_id {} not found",
1081                                                route.link_id.as_deref().unwrap_or("<none>")
1082                                            )
1083                                        }
1084                                        Err(err) => err,
1085                                        _ => "unknown connection lookup error".to_string(),
1086                                    };
1087                                }
1088
1089                                // Add route status
1090                                routes_status.push(v1::RouteAck {
1091                                    route: Some(route.clone()),
1092                                    success: route_success,
1093                                    error_msg: route_error_msg,
1094                                });
1095                            }
1096
1097                            // Process routes to delete
1098                            for route in &config.routes_to_delete {
1099                                let mut route_success = true;
1100                                let mut route_error_msg = String::new();
1101
1102                                let conn = self.resolve_route_connection(route);
1103
1104                                if let Ok(Some(conn)) = conn {
1105                                    let name = route.name.clone().unwrap();
1106                                    let source = name.clone().with_id(NameId::NULL_COMPONENT);
1107
1108                                    let msg = DataPlaneMessage::builder()
1109                                        .source(source.clone())
1110                                        .destination(name.clone())
1111                                        .identity("")
1112                                        .flags(SlimHeaderFlags::default().with_recv_from(conn))
1113                                        .build_unsubscribe()
1114                                        .unwrap();
1115
1116                                    let sub_id = self
1117                                        .inner
1118                                        .route_subscription_ids
1119                                        .lock()
1120                                        .remove(&(name.clone(), conn));
1121                                    let unsubscribe_result = match sub_id {
1122                                        Some(subscription_id) => {
1123                                            self.send_unsubscribe_message_with_ack(
1124                                                msg,
1125                                                subscription_id,
1126                                            )
1127                                            .await
1128                                        }
1129                                        None => {
1130                                            // No tracking ID in the map (e.g. after a CP restart
1131                                            // or for orphan cleanup from the reconciler).  Generate
1132                                            // a fresh ID — the datapath locates the subscription
1133                                            // by Name+connection, not by ID; the ID is only used
1134                                            // for ack correlation.
1135                                            let (ack_id, ack_rx) =
1136                                                self.inner.subscription_manager.register_ack();
1137                                            let mut fresh_msg = msg;
1138                                            fresh_msg.set_subscription_id(ack_id);
1139                                            if let Err(e) =
1140                                                self.send_control_message(fresh_msg).await
1141                                            {
1142                                                self.inner.subscription_manager.cancel_ack(ack_id);
1143                                                Err(format!("datapath send error: {}", e.chain()))
1144                                            } else {
1145                                                match tokio::time::timeout(
1146                                                    SUBSCRIPTION_ACK_TIMEOUT,
1147                                                    ack_rx,
1148                                                )
1149                                                .await
1150                                                {
1151                                                    Ok(Ok(Ok(()))) => Ok(()),
1152                                                    Ok(Ok(Err(err))) => Err(err.to_string()),
1153                                                    Ok(Err(_)) => {
1154                                                        Err("subscription ack channel closed"
1155                                                            .to_string())
1156                                                    }
1157                                                    Err(_) => {
1158                                                        self.inner
1159                                                            .subscription_manager
1160                                                            .cancel_ack(ack_id);
1161                                                        Err("subscription ack timed out"
1162                                                            .to_string())
1163                                                    }
1164                                                }
1165                                            }
1166                                        }
1167                                    };
1168                                    if let Err(err) = unsubscribe_result {
1169                                        route_success = false;
1170                                        route_error_msg = format!("Failed to unsubscribe: {}", err);
1171                                    } else {
1172                                        info!(?route, "Successfully deleted route");
1173                                    }
1174                                } else {
1175                                    route_success = false;
1176                                    route_error_msg = match conn {
1177                                        Ok(None) => {
1178                                            format!(
1179                                                "Connection with link_id {} not found",
1180                                                route.link_id.as_deref().unwrap_or("<none>")
1181                                            )
1182                                        }
1183                                        Err(err) => err,
1184                                        _ => "unknown connection lookup error".to_string(),
1185                                    };
1186                                }
1187
1188                                // Add route status (for deletion)
1189                                routes_status.push(v1::RouteAck {
1190                                    route: Some(route.clone()),
1191                                    success: route_success,
1192                                    error_msg: route_error_msg,
1193                                });
1194                            }
1195
1196                            (connections_status, routes_status)
1197                        };
1198
1199                        let config_ack = v1::ConfigurationCommandAck {
1200                            original_message_id: msg.message_id.clone(),
1201                            connections_status,
1202                            routes_status,
1203                        };
1204
1205                        let reply = ControlMessage {
1206                            message_id: uuid::Uuid::new_v4().to_string(),
1207                            payload: Some(Payload::ConfigCommandAck(config_ack)),
1208                        };
1209
1210                        if let Err(e) = tx.send(Ok(reply)).await {
1211                            error!(error = %e.chain(), "failed to send ConfigurationCommandAck");
1212                        }
1213
1214                        info!(
1215                            connections = %config.connections_to_create.len(),
1216                            connections_to_delete = %config.connections_to_delete.len(),
1217                            routes_to_set = %config.routes_to_set.len(),
1218                            routes_to_del = %config.routes_to_delete.len(),
1219                            "Processed ConfigurationCommand"
1220                        );
1221                    }
1222                    Payload::RouteListRequest(_) => {
1223                        const CHUNK_SIZE: usize = 100;
1224
1225                        let conn_table = self.inner.message_processor.connection_table();
1226                        let mut entries = Vec::new();
1227
1228                        self.inner.message_processor.subscription_table().for_each(
1229                            |name, id, local, remote, _peer| {
1230                                let mut entry = RouteEntry {
1231                                    name: Some(name.clone().with_id(id)),
1232                                    ..Default::default()
1233                                };
1234
1235                                for &cid in local {
1236                                    entry.connections.push(ConnectionEntry {
1237                                        id: cid,
1238                                        connection_type: ConnectionType::Local as i32,
1239                                        config_data: "{}".to_string(),
1240                                        link_id: None,
1241                                        direction: ConnectionDirection::Outgoing as i32,
1242                                    });
1243                                }
1244
1245                                for &cid in remote {
1246                                    if let Some(conn) = conn_table.get(cid) {
1247                                        entry.connections.push(ConnectionEntry {
1248                                            id: cid,
1249                                            connection_type: ConnectionType::Remote as i32,
1250                                            config_data: match conn.config_data() {
1251                                                Some(data) => serde_json::to_string(data)
1252                                                    .unwrap_or_else(|_| "{}".to_string()),
1253                                                None => "{}".to_string(),
1254                                            },
1255                                            link_id: conn.link_id(),
1256                                            direction: if conn.is_outgoing() {
1257                                                ConnectionDirection::Outgoing as i32
1258                                            } else {
1259                                                ConnectionDirection::Incoming as i32
1260                                            },
1261                                        });
1262                                    } else {
1263                                        error!(%cid, "no connection entry for id");
1264                                    }
1265                                }
1266                                entries.push(entry);
1267                            },
1268                        );
1269
1270                        let chunks: Vec<_> = entries.chunks(CHUNK_SIZE).collect();
1271                        if chunks.is_empty() {
1272                            let resp = ControlMessage {
1273                                message_id: uuid::Uuid::new_v4().to_string(),
1274                                payload: Some(Payload::RouteListResponse(RouteListResponse {
1275                                    original_message_id: msg.message_id.clone(),
1276                                    entries: vec![],
1277                                    done: true,
1278                                })),
1279                            };
1280                            if let Err(e) = tx.send(Ok(resp)).await {
1281                                error!(error = %e.chain(), "failed to send route list response");
1282                            }
1283                        } else {
1284                            let n = chunks.len();
1285                            for (i, chunk) in chunks.into_iter().enumerate() {
1286                                let resp = ControlMessage {
1287                                    message_id: uuid::Uuid::new_v4().to_string(),
1288                                    payload: Some(Payload::RouteListResponse(RouteListResponse {
1289                                        original_message_id: msg.message_id.clone(),
1290                                        entries: chunk.to_vec(),
1291                                        done: i + 1 == n,
1292                                    })),
1293                                };
1294                                if let Err(e) = tx.send(Ok(resp)).await {
1295                                    error!(error = %e.chain(), "failed to send route batch");
1296                                    break;
1297                                }
1298                            }
1299                        }
1300                    }
1301                    Payload::ConnectionListRequest(_) => {
1302                        let mut all_entries = Vec::new();
1303                        self.inner
1304                            .message_processor
1305                            .connection_table()
1306                            .for_each(|id, conn| {
1307                                debug!(
1308                                    conn_id = id,
1309                                    local_addr = ?conn.local_addr(),
1310                                    remote_addr = ?conn.remote_addr(),
1311                                    is_outgoing = conn.is_outgoing(),
1312                                    link_id = ?conn.link_id(),
1313                                    "connection entry",
1314                                );
1315                                all_entries.push(ConnectionEntry {
1316                                    id,
1317                                    connection_type: ConnectionType::Remote as i32,
1318                                    config_data: match conn.config_data() {
1319                                        Some(data) => serde_json::to_string(data)
1320                                            .unwrap_or_else(|_| "{}".to_string()),
1321                                        None => "{}".to_string(),
1322                                    },
1323                                    link_id: conn.link_id(),
1324                                    direction: if conn.is_outgoing() {
1325                                        ConnectionDirection::Outgoing as i32
1326                                    } else {
1327                                        ConnectionDirection::Incoming as i32
1328                                    },
1329                                });
1330                            });
1331
1332                        const CHUNK_SIZE: usize = 100;
1333                        let chunks: Vec<_> = all_entries.chunks(CHUNK_SIZE).collect();
1334                        if chunks.is_empty() {
1335                            let resp = ControlMessage {
1336                                message_id: uuid::Uuid::new_v4().to_string(),
1337                                payload: Some(Payload::ConnectionListResponse(
1338                                    ConnectionListResponse {
1339                                        original_message_id: msg.message_id.clone(),
1340                                        entries: vec![],
1341                                        done: true,
1342                                    },
1343                                )),
1344                            };
1345                            if let Err(e) = tx.send(Ok(resp)).await {
1346                                error!(error = %e.chain(), "failed to send connection list response");
1347                            }
1348                        } else {
1349                            let n = chunks.len();
1350                            for (i, chunk) in chunks.into_iter().enumerate() {
1351                                let resp = ControlMessage {
1352                                    message_id: uuid::Uuid::new_v4().to_string(),
1353                                    payload: Some(Payload::ConnectionListResponse(
1354                                        ConnectionListResponse {
1355                                            original_message_id: msg.message_id.clone(),
1356                                            entries: chunk.to_vec(),
1357                                            done: i + 1 == n,
1358                                        },
1359                                    )),
1360                                };
1361                                if let Err(e) = tx.send(Ok(resp)).await {
1362                                    error!(error = %e.chain(), "failed to send connection list batch");
1363                                    break;
1364                                }
1365                            }
1366                        }
1367                    }
1368                    Payload::RegisterNodeRequest(_) => {
1369                        error!("received a register node request");
1370                    }
1371                    Payload::DeregisterNodeRequest(_) => {
1372                        error!("received a deregister node request");
1373                    }
1374                    _ => {
1375                        // received unsupported message type, do nothing
1376                        debug!("received unsupported message type from control - ignoring");
1377                    }
1378                }
1379            }
1380            None => {
1381                error!(
1382                    message_id = %msg.message_id,
1383                    "received control message with no payload",
1384                );
1385            }
1386        }
1387
1388        Ok(())
1389    }
1390
1391    async fn handle_subscribe_message(&self, dst: ProtoName, clients: &[ClientConfig]) {
1392        let mut sub_vec = vec![];
1393
1394        let cmd = v1::Route {
1395            name: Some(dst),
1396            link_id: None,
1397            direction: None,
1398        };
1399
1400        sub_vec.push(cmd);
1401
1402        let ctrl = ControlMessage {
1403            message_id: uuid::Uuid::new_v4().to_string(),
1404            payload: Some(Payload::ConfigCommand(v1::ConfigurationCommand {
1405                connections_to_create: vec![],
1406                connections_to_delete: vec![],
1407                routes_to_set: sub_vec,
1408                routes_to_delete: vec![],
1409                reconcile: false,
1410            })),
1411        };
1412
1413        return self.send_or_queue_notification(ctrl, clients).await;
1414    }
1415
1416    async fn handle_unsubscribe_message(&self, dst: ProtoName, clients: &[ClientConfig]) {
1417        let mut unsub_vec = vec![];
1418
1419        let cmd = v1::Route {
1420            name: Some(dst),
1421            link_id: None,
1422            direction: None,
1423        };
1424
1425        unsub_vec.push(cmd);
1426
1427        let ctrl = ControlMessage {
1428            message_id: uuid::Uuid::new_v4().to_string(),
1429            payload: Some(Payload::ConfigCommand(v1::ConfigurationCommand {
1430                connections_to_create: vec![],
1431                connections_to_delete: vec![],
1432                routes_to_set: vec![],
1433                routes_to_delete: unsub_vec,
1434                reconcile: false,
1435            })),
1436        };
1437
1438        return self.send_or_queue_notification(ctrl, clients).await;
1439    }
1440
1441    /// Send a subscribe message and await the ack. Returns the subscription_id.
1442    async fn send_subscribe_message_with_ack(
1443        &self,
1444        mut msg: DataPlaneMessage,
1445    ) -> Result<u64, String> {
1446        let (ack_id, ack_rx) = self.inner.subscription_manager.register_ack();
1447        msg.set_subscription_id(ack_id);
1448
1449        if let Err(e) = self.send_control_message(msg).await {
1450            self.inner.subscription_manager.cancel_ack(ack_id);
1451            return Err(format!("datapath send error: {}", e.chain()));
1452        }
1453
1454        match tokio::time::timeout(SUBSCRIPTION_ACK_TIMEOUT, ack_rx).await {
1455            Ok(Ok(Ok(()))) => Ok(ack_id),
1456            Ok(Ok(Err(err))) => Err(err.to_string()),
1457            Ok(Err(_)) => Err("subscription ack channel closed".to_string()),
1458            Err(_) => {
1459                self.inner.subscription_manager.cancel_ack(ack_id);
1460                Err("subscription ack timed out".to_string())
1461            }
1462        }
1463    }
1464
1465    /// Send an unsubscribe message with a given subscription_id and await the ack.
1466    async fn send_unsubscribe_message_with_ack(
1467        &self,
1468        mut msg: DataPlaneMessage,
1469        subscription_id: u64,
1470    ) -> Result<(), String> {
1471        let ack_rx = self
1472            .inner
1473            .subscription_manager
1474            .register_ack_with_id(subscription_id);
1475        msg.set_subscription_id(subscription_id);
1476
1477        if let Err(e) = self.send_control_message(msg).await {
1478            self.inner.subscription_manager.cancel_ack(subscription_id);
1479            return Err(format!("datapath send error: {}", e.chain()));
1480        }
1481
1482        match tokio::time::timeout(SUBSCRIPTION_ACK_TIMEOUT, ack_rx).await {
1483            Ok(Ok(Ok(()))) => Ok(()),
1484            Ok(Ok(Err(err))) => Err(err.to_string()),
1485            Ok(Err(_)) => Err("subscription ack channel closed".to_string()),
1486            Err(_) => {
1487                self.inner.subscription_manager.cancel_ack(subscription_id);
1488                Err("subscription ack timed out".to_string())
1489            }
1490        }
1491    }
1492
1493    /// Send a control message to SLIM.
1494    async fn send_control_message(&self, msg: DataPlaneMessage) -> Result<(), ControllerError> {
1495        self.inner.tx_slim.send(Ok(msg)).await.map_err(|e| {
1496            error!(error = %e.chain(), "error sending message into datapath");
1497            ControllerError::Datapath(slim_datapath::errors::DataPathError::ConnectionError)
1498        })
1499    }
1500
1501    /// Send notification to control plane or queue it if no connection is available.
1502    ///
1503    /// Uses `try_send` to avoid blocking the caller when the bounded channel is
1504    /// full (e.g. due to HTTP/2 back-pressure).  Blocking here would stall the
1505    /// data-plane listener task, preventing it from resolving subscription acks
1506    /// that the control-message handler is waiting on — a deadlock.
1507    async fn send_or_queue_notification(&self, ctrl_msg: ControlMessage, clients: &[ClientConfig]) {
1508        let mut sent = false;
1509
1510        for c in clients {
1511            let tx = match self.inner.tx_channels.read().get(&c.endpoint) {
1512                Some(tx) => tx.clone(),
1513                None => continue,
1514            };
1515
1516            match tx.try_send(Ok(ctrl_msg.clone())) {
1517                Ok(()) => {
1518                    sent = true;
1519                }
1520                Err(mpsc::error::TrySendError::Full(_)) => {
1521                    debug!(
1522                        endpoint = %c.endpoint,
1523                        "channel full, queuing notification instead of blocking"
1524                    );
1525                }
1526                Err(mpsc::error::TrySendError::Closed(_)) => {
1527                    debug!(
1528                        endpoint = %c.endpoint,
1529                        "channel closed, queuing notification"
1530                    );
1531                }
1532            }
1533        }
1534
1535        if !sent {
1536            let mut queue = self.inner.pending_notifications.lock();
1537            if queue.len() >= MAX_QUEUED_NOTIFICATIONS {
1538                queue.pop_front();
1539                debug!("queue full, removed oldest notification");
1540            }
1541            queue.push_back(ctrl_msg);
1542        }
1543    }
1544
1545    /// Get a drain watch clone to pass to a task
1546    fn drain_watch(&self) -> Result<drain::Watch, ControllerError> {
1547        self.inner
1548            .drain_watch
1549            .read()
1550            .clone()
1551            .ok_or(ControllerError::AlreadyStopped)
1552    }
1553
1554    /// Send all queued subscription notifications when connection is restored.
1555    async fn send_queued_notifications(
1556        &self,
1557        tx: &mpsc::Sender<Result<ControlMessage, Status>>,
1558        endpoint: &str,
1559    ) {
1560        let notifications = {
1561            let mut queue = self.inner.pending_notifications.lock();
1562            if queue.is_empty() {
1563                return;
1564            }
1565            queue.drain(..).collect::<Vec<_>>()
1566        };
1567
1568        if notifications.is_empty() {
1569            return;
1570        }
1571
1572        debug!(
1573            "sending {} queued subscription notifications to {}",
1574            notifications.len(),
1575            endpoint
1576        );
1577
1578        let mut failed_notifications = Vec::new();
1579        for notification in notifications {
1580            if let Err(e) = tx.send(Ok(notification)).await {
1581                error!(
1582                    error = %e.chain(),
1583                    %endpoint,
1584                    "failed to send queued notification to control plane",
1585                );
1586
1587                // we can unwrap here because we know we sent a Ok(ControlMessage)
1588                failed_notifications.push(e.0.unwrap());
1589            }
1590        }
1591
1592        // Re-queue any failed notifications
1593        if !failed_notifications.is_empty() {
1594            self.inner
1595                .pending_notifications
1596                .lock()
1597                .extend(failed_notifications);
1598        }
1599    }
1600
1601    /// Process the control message stream.
1602    fn process_control_message_stream(
1603        &self,
1604        config: Option<ClientConfig>,
1605        mut stream: impl Stream<Item = Result<ControlMessage, Status>> + Unpin + Send + 'static,
1606        tx: mpsc::Sender<Result<ControlMessage, Status>>,
1607        cancellation_token: CancellationToken,
1608    ) -> Result<JoinHandle<()>, ControllerError> {
1609        let this = self.clone();
1610        let watch = self.drain_watch()?;
1611
1612        let handle = tokio::spawn(async move {
1613            // Send a register message to the control plane
1614            let endpoint = config
1615                .as_ref()
1616                .map(|c| c.endpoint.clone())
1617                .unwrap_or_else(|| "unknown".to_string());
1618            info!(%endpoint, "connected to control plane");
1619
1620            let mut retry_connect = false;
1621
1622            let mut active_connections = Vec::new();
1623            this.inner
1624                .message_processor
1625                .connection_table()
1626                .for_each(|id, conn| {
1627                    active_connections.push(v1::ConnectionEntry {
1628                        id,
1629                        connection_type: v1::ConnectionType::Remote as i32,
1630                        config_data: match conn.config_data() {
1631                            Some(data) => {
1632                                serde_json::to_string(data).unwrap_or_else(|_| "{}".to_string())
1633                            }
1634                            None => "{}".to_string(),
1635                        },
1636                        link_id: conn.link_id(),
1637                        direction: if conn.is_outgoing() {
1638                            v1::ConnectionDirection::Outgoing as i32
1639                        } else {
1640                            v1::ConnectionDirection::Incoming as i32
1641                        },
1642                    });
1643                });
1644
1645            let active_routes = {
1646                let conn_id_to_link_id: HashMap<u64, String> = this
1647                    .inner
1648                    .link_id_to_conn_id
1649                    .read()
1650                    .iter()
1651                    .map(|(lid, cid)| (*cid, lid.clone()))
1652                    .collect();
1653                this.inner
1654                    .route_subscription_ids
1655                    .lock()
1656                    .iter()
1657                    .map(|((name, conn_id), _sub_id)| v1::Route {
1658                        name: Some(name.clone()),
1659                        link_id: conn_id_to_link_id.get(conn_id).cloned(),
1660                        direction: None,
1661                    })
1662                    .collect::<Vec<_>>()
1663            };
1664
1665            let register_request = ControlMessage {
1666                message_id: uuid::Uuid::new_v4().to_string(),
1667                payload: Some(Payload::RegisterNodeRequest(v1::RegisterNodeRequest {
1668                    node_id: this.inner.id.to_string(),
1669                    group_name: this.inner.group_name.clone(),
1670                    connection_details: this.inner.connection_details.clone(),
1671                    connections: active_connections,
1672                    routes: active_routes,
1673                })),
1674            };
1675
1676            // send register request if client
1677            if config.is_some()
1678                && let Err(e) = tx.send(Ok(register_request)).await
1679            {
1680                error!(error = %e.chain(), "failed to send register request");
1681                return;
1682            }
1683
1684            // Send queued notifications after the register request so that
1685            // RegisterNodeRequest is always the first message on the stream.
1686            this.send_queued_notifications(&tx, &endpoint).await;
1687
1688            // Wait for RegisterNodeResponse from control plane before processing commands.
1689            if config.is_some() {
1690                let registration_result = tokio::time::timeout(
1691                    Duration::from_secs(10),
1692                    async {
1693                        loop {
1694                            tokio::select! {
1695                                next = stream.next() => {
1696                                    match next {
1697                                        Some(Ok(msg)) => {
1698                                            if let Some(Payload::RegisterNodeResponse(resp)) = msg.payload {
1699                                                if resp.success {
1700                                                    info!(
1701                                                        connections = resp.connections.len(),
1702                                                        routes = resp.routes.len(),
1703                                                        "registration acknowledged by control plane"
1704                                                    );
1705                                                    return Ok(resp);
1706                                                } else {
1707                                                    return Err("registration rejected by control plane".to_string());
1708                                                }
1709                                            }
1710                                        }
1711                                        Some(Err(_)) => return Err("stream error waiting for registration response".to_string()),
1712                                        None => return Err("stream closed before registration response".to_string()),
1713                                    }
1714                                }
1715                                _ = cancellation_token.cancelled() => {
1716                                    return Err("cancelled while waiting for registration response".to_string());
1717                                }
1718                            }
1719                        }
1720                    }
1721                ).await;
1722
1723                match registration_result {
1724                    Ok(Ok(resp)) => {
1725                        // Apply initial desired state from the control plane.
1726                        if !resp.connections.is_empty() || !resp.routes.is_empty() {
1727                            let init_cmd = ControlMessage {
1728                                message_id: uuid::Uuid::new_v4().to_string(),
1729                                payload: Some(Payload::ConfigCommand(v1::ConfigurationCommand {
1730                                    connections_to_create: resp.connections,
1731                                    routes_to_set: resp.routes,
1732                                    routes_to_delete: vec![],
1733                                    connections_to_delete: vec![],
1734                                    reconcile: true,
1735                                })),
1736                            };
1737                            if let Err(e) = this.handle_new_control_message(init_cmd, &tx).await {
1738                                error!(error = %e.chain(), "failed to apply initial state from registration");
1739                            }
1740                        }
1741                    }
1742                    Ok(Err(e)) => {
1743                        error!(%e, "registration handshake failed");
1744                        return;
1745                    }
1746                    Err(_) => {
1747                        error!("registration handshake timed out");
1748                        return;
1749                    }
1750                }
1751            }
1752
1753            let mut drain_fut = std::pin::pin!(watch.clone().signaled());
1754
1755            loop {
1756                tokio::select! {
1757                    next = stream.next() => {
1758                        match next {
1759                            Some(Ok(msg)) => {
1760                                if let Err(e) = this.handle_new_control_message(msg, &tx).await {
1761                                    error!(error = %e.chain(), "error processing incoming control message");
1762                                }
1763                            }
1764                            Some(Err(e)) => {
1765                                if let Some(io_err) = Self::match_for_io_error(&e) {
1766                                    if io_err.kind() == std::io::ErrorKind::BrokenPipe {
1767                                        info!("connection closed by peer");
1768                                    } else {
1769                                        // Handle other IO errors (ConnectionAborted, etc.)
1770                                        error!(
1771                                            error = %e.chain(),
1772                                            io_error_kind = ?io_err.kind(),
1773                                            "IO error receiving control messages"
1774                                        );
1775                                    }
1776                                } else {
1777                                    // Handle non-IO errors (e.g., gRPC Canceled, Unavailable, etc.)
1778                                    error!(error = %e.chain(), "error receiving control messages");
1779                                }
1780
1781                                retry_connect = true;
1782                                break;
1783                            }
1784                            None => {
1785                                debug!("end of stream");
1786                                retry_connect = true;
1787                                break;
1788                            }
1789                        }
1790                    }
1791                    _ = cancellation_token.cancelled() => {
1792                        debug!("shutting down stream on cancellation token");
1793                        break;
1794                    }
1795                    _ = &mut drain_fut => {
1796                        debug!("shutting down stream on drain");
1797                        break;
1798                    }
1799                }
1800            }
1801
1802            info!(%endpoint, "control plane stream closed");
1803
1804            if retry_connect && let Some(config) = config {
1805                info!(%config.endpoint, "retrying connection to control plane");
1806                this.connect(config.clone(), cancellation_token)
1807                    .await
1808                    .map_or_else(
1809                        |e| {
1810                            error!(error = %e.chain(), "failed to reconnect to control plane");
1811                        },
1812                        |tx| {
1813                            info!(%config.endpoint, "reconnected to control plane");
1814
1815                            this.inner
1816                                .tx_channels
1817                                .write()
1818                                .insert(config.endpoint.clone(), tx);
1819                        },
1820                    )
1821            }
1822        });
1823
1824        Ok(handle)
1825    }
1826
1827    /// Connect to the control plane using the provided client configuration.
1828    /// This function attempts to establish a connection to the control plane and returns a sender for control messages.
1829    /// It retries the connection a specified number of times if it fails.
1830    async fn connect(
1831        &self,
1832        config: ClientConfig,
1833        cancellation_token: CancellationToken,
1834    ) -> Result<mpsc::Sender<Result<ControlMessage, Status>>, ControllerError> {
1835        info!(%config.endpoint, "connecting to control plane");
1836
1837        // Reset connection state before establishing a new session. The CP
1838        // will send fresh ConfigCommands after re-registration to rebuild
1839        // these maps.
1840        self.inner.route_subscription_ids.lock().clear();
1841        self.inner.link_id_to_conn_id.write().clear();
1842
1843        let channel = match config.to_channel().await? {
1844            TransportChannel::Grpc(c) => c,
1845            TransportChannel::Websocket(_) => {
1846                return Err(ControllerError::ConfigError(
1847                    slim_config::errors::ConfigError::GrpcChannelUnsupportedTransport,
1848                ));
1849            }
1850        };
1851
1852        let mut client = ControllerServiceClient::new(channel.clone());
1853        let (tx, rx) = mpsc::channel::<Result<ControlMessage, Status>>(128);
1854        let out_stream = ReceiverStream::new(rx).filter_map(|res| match res {
1855            Ok(msg) => Some(msg),
1856            Err(e) => {
1857                error!(error = %e, "dropping outbound control message due to error");
1858                None
1859            }
1860        });
1861        let stream = client
1862            .open_control_channel(Request::new(out_stream))
1863            .await?;
1864
1865        // start processing the incoming stream
1866        let endpoint_key = config.endpoint.clone();
1867        let handle = self.process_control_message_stream(
1868            Some(config),
1869            stream.into_inner(),
1870            tx.clone(),
1871            cancellation_token.clone(),
1872        )?;
1873        self.inner
1874            .stream_handles
1875            .lock()
1876            .insert(endpoint_key, handle);
1877
1878        // return the sender
1879        Ok(tx)
1880    }
1881
1882    fn match_for_io_error(err_status: &Status) -> Option<&std::io::Error> {
1883        let mut err: &(dyn std::error::Error + 'static) = err_status;
1884
1885        loop {
1886            if let Some(io_err) = err.downcast_ref::<std::io::Error>() {
1887                return Some(io_err);
1888            }
1889
1890            // h2::Error do not expose std::io::Error with `source()`
1891            // https://github.com/hyperium/h2/pull/462
1892            if let Some(h2_err) = err.downcast_ref::<h2::Error>()
1893                && let Some(io_err) = h2_err.get_io()
1894            {
1895                return Some(io_err);
1896            }
1897
1898            err = err.source()?;
1899        }
1900    }
1901}
1902
1903#[tonic::async_trait]
1904impl GrpcControllerService for ControllerService {
1905    type OpenControlChannelStream =
1906        Pin<Box<dyn Stream<Item = Result<ControlMessage, Status>> + Send + 'static>>;
1907
1908    async fn open_control_channel(
1909        &self,
1910        request: Request<tonic::Streaming<ControlMessage>>,
1911    ) -> Result<Response<Self::OpenControlChannelStream>, Status> {
1912        // Get the remote endpoint from the request metadata
1913        let remote_endpoint = request
1914            .remote_addr()
1915            .map(|addr| addr.to_string())
1916            .unwrap_or_else(|| "unknown".to_string());
1917
1918        let stream = request.into_inner();
1919        let (tx, rx) = mpsc::channel::<Result<ControlMessage, Status>>(128);
1920
1921        let cancellation_token = CancellationToken::new();
1922
1923        // Server-side connections don't initiate operations requiring acks, so no timer channel needed
1924        let handle = self
1925            .process_control_message_stream(None, stream, tx.clone(), cancellation_token.clone())
1926            .map_err(|e| {
1927                error!(error = %e.chain(), "error processing control message stream");
1928                Status::unavailable("failed to process control message stream")
1929            })?;
1930        self.inner
1931            .stream_handles
1932            .lock()
1933            .insert(remote_endpoint.clone(), handle);
1934
1935        self.inner
1936            .tx_channels
1937            .write()
1938            .insert(remote_endpoint.clone(), tx);
1939
1940        if let Some(old_token) = self
1941            .inner
1942            .cancellation_tokens
1943            .write()
1944            .insert(remote_endpoint.clone(), cancellation_token)
1945        {
1946            old_token.cancel();
1947        }
1948
1949        let out_stream = ReceiverStream::new(rx);
1950        Ok(Response::new(
1951            Box::pin(out_stream) as Self::OpenControlChannelStream
1952        ))
1953    }
1954}
1955
1956#[cfg(test)]
1957mod tests {
1958    use super::*;
1959    use slim_config::component::id::Kind;
1960    use tracing_test::traced_test;
1961
1962    async fn setup_control_planes(
1963        server_endpoint: &str,
1964        server_name: &str,
1965        client_name: &str,
1966    ) -> (ControlPlane, ControlPlane, ClientConfig) {
1967        let id_server = ID::new_with_name(Kind::new("slim").unwrap(), server_name).unwrap();
1968        let id_client = ID::new_with_name(Kind::new("slim").unwrap(), client_name).unwrap();
1969
1970        let server_config = ServerConfig::with_endpoint(server_endpoint)
1971            .with_tls_settings(slim_config::tls::server::TlsServerConfig::insecure());
1972        let client_config = ClientConfig::with_endpoint(&format!("http://{}", server_endpoint))
1973            .with_tls_setting(slim_config::tls::client::TlsClientConfig::insecure());
1974
1975        let message_processor_server = MessageProcessor::new();
1976        let message_processor_client = MessageProcessor::new();
1977
1978        let control_plane_server = ControlPlane::new(ControlPlaneSettings {
1979            id: id_server,
1980            group_name: None,
1981            servers: vec![server_config.clone()],
1982            clients: vec![],
1983            message_processor: Arc::new(message_processor_server),
1984            connection_details: vec![from_server_config(&server_config)],
1985        });
1986
1987        let control_plane_client = ControlPlane::new(ControlPlaneSettings {
1988            id: id_client,
1989            group_name: None,
1990            servers: vec![],
1991            clients: vec![client_config.clone()],
1992            message_processor: Arc::new(message_processor_client),
1993            connection_details: vec![],
1994        });
1995
1996        (control_plane_server, control_plane_client, client_config)
1997    }
1998
1999    #[tokio::test]
2000    #[traced_test]
2001    async fn test_end_to_end() {
2002        let (mut control_plane_server, mut control_plane_client, _client_cfg) =
2003            setup_control_planes(
2004                "127.0.0.1:50051",
2005                "test-server-instance",
2006                "test-client-instance",
2007            )
2008            .await;
2009
2010        control_plane_server.run().await.unwrap();
2011        tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
2012        control_plane_client.run().await.unwrap();
2013        tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
2014
2015        assert!(logs_contain("received a register node request"));
2016    }
2017
2018    #[tokio::test]
2019    #[traced_test]
2020    async fn test_subscription_notification_queue_drain() {
2021        // Reuse common setup with a different port to avoid clash with other tests.
2022        let (mut control_plane_server, mut control_plane_client, client_config) =
2023            setup_control_planes(
2024                "127.0.0.1:50061",
2025                "queue-drain-server",
2026                "queue-drain-client",
2027            )
2028            .await;
2029
2030        let controller = control_plane_client.controller.clone();
2031        assert_eq!(controller.inner.pending_notifications.lock().len(), 0);
2032
2033        const N: usize = 5;
2034        for i in 0..N {
2035            let ctrl_msg = ControlMessage {
2036                message_id: uuid::Uuid::new_v4().to_string(),
2037                payload: Some(Payload::ConfigCommand(v1::ConfigurationCommand {
2038                    connections_to_create: vec![],
2039                    connections_to_delete: vec![],
2040                    routes_to_set: vec![v1::Route {
2041                        name: Some(
2042                            ProtoName::from_strings(["queued", "sub", &format!("name-{i}")])
2043                                .with_id(i as u128),
2044                        ),
2045                        link_id: None,
2046                        direction: None,
2047                    }],
2048                    routes_to_delete: vec![],
2049                    reconcile: false,
2050                })),
2051            };
2052            controller
2053                .send_or_queue_notification(ctrl_msg, std::slice::from_ref(&client_config))
2054                .await;
2055        }
2056        assert_eq!(controller.inner.pending_notifications.lock().len(), N);
2057
2058        control_plane_server.run().await.expect("server run failed");
2059        tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
2060        control_plane_client.run().await.expect("client run failed");
2061        tokio::time::sleep(tokio::time::Duration::from_millis(200)).await;
2062
2063        assert_eq!(controller.inner.pending_notifications.lock().len(), 0);
2064        assert!(
2065            logs_contain(&format!("sending {} queued subscription notifications", N)),
2066            "Expected log about sending queued subscription notifications"
2067        );
2068
2069        drop(controller);
2070        drop(control_plane_server);
2071        drop(control_plane_client);
2072    }
2073
2074    #[tokio::test]
2075    #[traced_test]
2076    async fn test_delete_connection_by_link_id_success_ack() {
2077        let (mut control_plane_server, mut control_plane_client, _client_cfg) =
2078            setup_control_planes(
2079                "127.0.0.1:50081",
2080                "delete-linkid-server",
2081                "delete-linkid-client",
2082            )
2083            .await;
2084
2085        control_plane_server.run().await.unwrap();
2086        tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
2087        control_plane_client.run().await.unwrap();
2088        tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
2089
2090        let controller = control_plane_client.controller.clone();
2091        let link_id = "test-delete-link-id".to_string();
2092        let mut assigned = false;
2093        for _ in 0..50 {
2094            controller
2095                .inner
2096                .message_processor
2097                .connection_table()
2098                .for_each(|_, conn| {
2099                    if !assigned {
2100                        conn.set_link_id(link_id.clone());
2101                        assigned = true;
2102                    }
2103                });
2104            if assigned {
2105                break;
2106            }
2107            tokio::time::sleep(tokio::time::Duration::from_millis(20)).await;
2108        }
2109        assert!(
2110            assigned,
2111            "expected at least one connection to assign link_id"
2112        );
2113
2114        let ctrl_msg = ControlMessage {
2115            message_id: uuid::Uuid::new_v4().to_string(),
2116            payload: Some(Payload::ConfigCommand(v1::ConfigurationCommand {
2117                connections_to_create: vec![],
2118                connections_to_delete: vec![link_id.clone()],
2119                routes_to_set: vec![],
2120                routes_to_delete: vec![],
2121                reconcile: false,
2122            })),
2123        };
2124        let (tx, mut rx) = mpsc::channel(1);
2125        controller
2126            .handle_new_control_message(ctrl_msg, &tx)
2127            .await
2128            .expect("config command must be handled");
2129
2130        let ack_msg = rx
2131            .recv()
2132            .await
2133            .expect("expected ack message")
2134            .expect("ack should be ok");
2135        let ack = match ack_msg.payload {
2136            Some(Payload::ConfigCommandAck(ack)) => ack,
2137            _ => panic!("expected ConfigCommandAck payload"),
2138        };
2139        assert_eq!(ack.connections_status.len(), 1);
2140        assert_eq!(ack.connections_status[0].link_id, link_id);
2141        assert!(ack.connections_status[0].success);
2142    }
2143
2144    #[tokio::test]
2145    #[traced_test]
2146    async fn test_delete_connection_by_link_id_unknown_fails_ack() {
2147        let (control_plane_server, control_plane_client, _client_cfg) = setup_control_planes(
2148            "127.0.0.1:50082",
2149            "delete-linkid-server-unknown",
2150            "delete-linkid-client-unknown",
2151        )
2152        .await;
2153
2154        let controller = control_plane_client.controller.clone();
2155        let ctrl_msg = ControlMessage {
2156            message_id: uuid::Uuid::new_v4().to_string(),
2157            payload: Some(Payload::ConfigCommand(v1::ConfigurationCommand {
2158                connections_to_create: vec![],
2159                connections_to_delete: vec!["unknown-link-id".to_string()],
2160                routes_to_set: vec![],
2161                routes_to_delete: vec![],
2162                reconcile: false,
2163            })),
2164        };
2165        let (tx, mut rx) = mpsc::channel(1);
2166        controller
2167            .handle_new_control_message(ctrl_msg, &tx)
2168            .await
2169            .expect("config command must be handled");
2170
2171        let ack_msg = rx
2172            .recv()
2173            .await
2174            .expect("expected ack message")
2175            .expect("ack should be ok");
2176        let ack = match ack_msg.payload {
2177            Some(Payload::ConfigCommandAck(ack)) => ack,
2178            _ => panic!("expected ConfigCommandAck payload"),
2179        };
2180        assert_eq!(ack.connections_status.len(), 1);
2181        assert_eq!(ack.connections_status[0].link_id, "unknown-link-id");
2182        assert!(!ack.connections_status[0].success);
2183        assert!(ack.connections_status[0].error_msg.contains("not found"));
2184
2185        drop(control_plane_server);
2186    }
2187
2188    #[tokio::test]
2189    #[traced_test]
2190    async fn test_create_connection_with_existing_link_id_reuses_connection_ack() {
2191        let (mut control_plane_server, mut control_plane_client, _client_cfg) =
2192            setup_control_planes(
2193                "127.0.0.1:50083",
2194                "create-linkid-server",
2195                "create-linkid-client",
2196            )
2197            .await;
2198
2199        control_plane_server.run().await.unwrap();
2200        tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
2201        control_plane_client.run().await.unwrap();
2202        tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
2203
2204        let controller = control_plane_client.controller.clone();
2205        let link_id = "test-create-link-id".to_string();
2206        let mut assigned = false;
2207        for _ in 0..50 {
2208            controller
2209                .inner
2210                .message_processor
2211                .connection_table()
2212                .for_each(|_, conn| {
2213                    if !assigned {
2214                        conn.set_link_id(link_id.clone());
2215                        assigned = true;
2216                    }
2217                });
2218            if assigned {
2219                break;
2220            }
2221            tokio::time::sleep(tokio::time::Duration::from_millis(20)).await;
2222        }
2223        assert!(
2224            assigned,
2225            "expected at least one connection to assign link_id"
2226        );
2227
2228        let endpoint = "http://127.0.0.1:59999";
2229        let connection_config = serde_json::json!({
2230            "endpoint": endpoint,
2231            "link_id": link_id
2232        })
2233        .to_string();
2234
2235        let ctrl_msg = ControlMessage {
2236            message_id: uuid::Uuid::new_v4().to_string(),
2237            payload: Some(Payload::ConfigCommand(v1::ConfigurationCommand {
2238                connections_to_create: vec![v1::Connection {
2239                    link_id: "reuse-existing-link".to_string(),
2240                    config_data: connection_config,
2241                }],
2242                connections_to_delete: vec![],
2243                routes_to_set: vec![],
2244                routes_to_delete: vec![],
2245                reconcile: false,
2246            })),
2247        };
2248
2249        let (tx, mut rx) = mpsc::channel(1);
2250        controller
2251            .handle_new_control_message(ctrl_msg, &tx)
2252            .await
2253            .expect("config command must be handled");
2254
2255        let ack_msg = rx
2256            .recv()
2257            .await
2258            .expect("expected ack message")
2259            .expect("ack should be ok");
2260        let ack = match ack_msg.payload {
2261            Some(Payload::ConfigCommandAck(ack)) => ack,
2262            _ => panic!("expected ConfigCommandAck payload"),
2263        };
2264        assert_eq!(ack.connections_status.len(), 1);
2265        assert_eq!(ack.connections_status[0].link_id, "reuse-existing-link");
2266        assert!(ack.connections_status[0].success);
2267
2268        assert!(
2269            controller
2270                .inner
2271                .link_id_to_conn_id
2272                .read()
2273                .contains_key(&link_id),
2274            "expected link_id to be mapped to reused connection id"
2275        );
2276    }
2277
2278    #[tokio::test]
2279    #[traced_test]
2280    async fn test_subscription_set_unknown_link_id_fails_ack() {
2281        let (control_plane_server, control_plane_client, _client_cfg) = setup_control_planes(
2282            "127.0.0.1:50084",
2283            "sub-linkid-server-unknown",
2284            "sub-linkid-client-unknown",
2285        )
2286        .await;
2287
2288        let controller = control_plane_client.controller.clone();
2289        let ctrl_msg = ControlMessage {
2290            message_id: uuid::Uuid::new_v4().to_string(),
2291            payload: Some(Payload::ConfigCommand(v1::ConfigurationCommand {
2292                connections_to_create: vec![],
2293                connections_to_delete: vec![],
2294                routes_to_set: vec![v1::Route {
2295                    name: Some(ProtoName::from_strings(["org", "ns", "agent"]).with_id(1u128)),
2296                    link_id: Some("missing-link-id".to_string()),
2297                    direction: None,
2298                }],
2299                routes_to_delete: vec![],
2300                reconcile: false,
2301            })),
2302        };
2303        let (tx, mut rx) = mpsc::channel(1);
2304        controller
2305            .handle_new_control_message(ctrl_msg, &tx)
2306            .await
2307            .expect("config command must be handled");
2308
2309        let ack_msg = rx
2310            .recv()
2311            .await
2312            .expect("expected ack message")
2313            .expect("ack should be ok");
2314        let ack = match ack_msg.payload {
2315            Some(Payload::ConfigCommandAck(ack)) => ack,
2316            _ => panic!("expected ConfigCommandAck payload"),
2317        };
2318
2319        assert_eq!(ack.routes_status.len(), 1);
2320        assert!(!ack.routes_status[0].success);
2321        assert!(
2322            ack.routes_status[0]
2323                .error_msg
2324                .contains("Connection with link_id missing-link-id not found")
2325        );
2326
2327        drop(control_plane_server);
2328    }
2329
2330    #[tokio::test]
2331    #[traced_test]
2332    async fn test_create_connection_invalid_config_fails_ack() {
2333        let (_control_plane_server, control_plane_client, _client_cfg) = setup_control_planes(
2334            "127.0.0.1:50085",
2335            "create-invalid-config-server",
2336            "create-invalid-config-client",
2337        )
2338        .await;
2339
2340        let controller = control_plane_client.controller.clone();
2341        let ctrl_msg = ControlMessage {
2342            message_id: uuid::Uuid::new_v4().to_string(),
2343            payload: Some(Payload::ConfigCommand(v1::ConfigurationCommand {
2344                connections_to_create: vec![v1::Connection {
2345                    link_id: "invalid-config-conn".to_string(),
2346                    config_data: "{invalid-json".to_string(),
2347                }],
2348                connections_to_delete: vec![],
2349                routes_to_set: vec![],
2350                routes_to_delete: vec![],
2351                reconcile: false,
2352            })),
2353        };
2354        let (tx, mut rx) = mpsc::channel(1);
2355        controller
2356            .handle_new_control_message(ctrl_msg, &tx)
2357            .await
2358            .expect("config command must be handled");
2359
2360        let ack_msg = rx
2361            .recv()
2362            .await
2363            .expect("expected ack message")
2364            .expect("ack should be ok");
2365        let ack = match ack_msg.payload {
2366            Some(Payload::ConfigCommandAck(ack)) => ack,
2367            _ => panic!("expected ConfigCommandAck payload"),
2368        };
2369        assert_eq!(ack.connections_status.len(), 1);
2370        assert!(!ack.connections_status[0].success);
2371        assert!(
2372            ack.connections_status[0]
2373                .error_msg
2374                .contains("Failed to parse config")
2375        );
2376    }
2377
2378    #[tokio::test]
2379    #[traced_test]
2380    async fn test_subscription_delete_unknown_link_id_fails_ack() {
2381        let (control_plane_server, control_plane_client, _client_cfg) = setup_control_planes(
2382            "127.0.0.1:50086",
2383            "sub-del-linkid-server-unknown",
2384            "sub-del-linkid-client-unknown",
2385        )
2386        .await;
2387
2388        let controller = control_plane_client.controller.clone();
2389        let ctrl_msg = ControlMessage {
2390            message_id: uuid::Uuid::new_v4().to_string(),
2391            payload: Some(Payload::ConfigCommand(v1::ConfigurationCommand {
2392                connections_to_create: vec![],
2393                connections_to_delete: vec![],
2394                routes_to_set: vec![],
2395                routes_to_delete: vec![v1::Route {
2396                    name: Some(ProtoName::from_strings(["org", "ns", "agent"]).with_id(1u128)),
2397                    link_id: Some("missing-link-id-delete".to_string()),
2398                    direction: None,
2399                }],
2400                reconcile: false,
2401            })),
2402        };
2403        let (tx, mut rx) = mpsc::channel(1);
2404        controller
2405            .handle_new_control_message(ctrl_msg, &tx)
2406            .await
2407            .expect("config command must be handled");
2408
2409        let ack_msg = rx
2410            .recv()
2411            .await
2412            .expect("expected ack message")
2413            .expect("ack should be ok");
2414        let ack = match ack_msg.payload {
2415            Some(Payload::ConfigCommandAck(ack)) => ack,
2416            _ => panic!("expected ConfigCommandAck payload"),
2417        };
2418
2419        assert_eq!(ack.routes_status.len(), 1);
2420        assert!(!ack.routes_status[0].success);
2421        assert!(
2422            ack.routes_status[0]
2423                .error_msg
2424                .contains("Connection with link_id missing-link-id-delete not found")
2425        );
2426
2427        drop(control_plane_server);
2428    }
2429
2430    #[tokio::test]
2431    #[traced_test]
2432    async fn test_shutdown_drains_resources() {
2433        // Use a unique port to avoid conflicts with other tests.
2434        let (mut control_plane_server, mut control_plane_client, _client_cfg) =
2435            setup_control_planes(
2436                "127.0.0.1:50071",
2437                "shutdown-server-instance",
2438                "shutdown-client-instance",
2439            )
2440            .await;
2441
2442        // Run both ends to populate cancellation tokens.
2443        control_plane_server
2444            .run()
2445            .await
2446            .expect("server should start");
2447        tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
2448        control_plane_client
2449            .run()
2450            .await
2451            .expect("client should start");
2452        tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
2453
2454        // Ensure we have at least one cancellation token (server or client side tasks).
2455        let server_tokens_before = control_plane_server
2456            .controller
2457            .inner
2458            .cancellation_tokens
2459            .read()
2460            .len();
2461        assert!(
2462            server_tokens_before > 0,
2463            "expected server to have active cancellation tokens before shutdown"
2464        );
2465
2466        let client_tokens_before = control_plane_client
2467            .controller
2468            .inner
2469            .cancellation_tokens
2470            .read()
2471            .len();
2472        assert!(
2473            client_tokens_before > 0,
2474            "expected client to have active cancellation tokens before shutdown"
2475        );
2476
2477        // Perform shutdown on both.
2478        control_plane_client
2479            .shutdown()
2480            .await
2481            .expect("client shutdown ok");
2482        control_plane_server
2483            .shutdown()
2484            .await
2485            .expect("server shutdown ok");
2486
2487        // After shutdown, all cancellation tokens should be drained.
2488        let server_tokens_after = control_plane_server
2489            .controller
2490            .inner
2491            .cancellation_tokens
2492            .read()
2493            .len();
2494        assert_eq!(
2495            server_tokens_after, 0,
2496            "expected server cancellation tokens to be drained after shutdown"
2497        );
2498
2499        let client_tokens_after = control_plane_client
2500            .controller
2501            .inner
2502            .cancellation_tokens
2503            .read()
2504            .len();
2505        assert_eq!(
2506            client_tokens_after, 0,
2507            "expected client cancellation tokens to be drained after shutdown"
2508        );
2509
2510        // Second shutdown should error because drain_signal has been taken.
2511        assert!(
2512            control_plane_server.shutdown().await.is_err(),
2513            "second shutdown on server should return an error"
2514        );
2515        assert!(
2516            control_plane_client.shutdown().await.is_err(),
2517            "second shutdown on client should return an error"
2518        );
2519    }
2520
2521    #[tokio::test]
2522    #[traced_test]
2523    async fn test_shutdown_without_run() {
2524        // Build a control plane but do NOT call run()
2525        let (control_plane_server, mut _control_plane_client, _client_cfg) = setup_control_planes(
2526            "127.0.0.1:50072",
2527            "shutdown-no-run-server",
2528            "shutdown-no-run-client",
2529        )
2530        .await;
2531
2532        // No tasks should be registered yet
2533        assert_eq!(
2534            control_plane_server
2535                .controller
2536                .inner
2537                .cancellation_tokens
2538                .read()
2539                .len(),
2540            0,
2541            "expected zero cancellation tokens before shutdown when not run"
2542        );
2543
2544        // Shutdown should still succeed gracefully.
2545        control_plane_server
2546            .shutdown()
2547            .await
2548            .expect("shutdown without prior run should succeed");
2549
2550        // Tokens remain zero.
2551        assert_eq!(
2552            control_plane_server
2553                .controller
2554                .inner
2555                .cancellation_tokens
2556                .read()
2557                .len(),
2558            0,
2559            "expected zero cancellation tokens after shutdown when not run"
2560        );
2561
2562        // Second shutdown should fail.
2563        assert!(
2564            control_plane_server.shutdown().await.is_err(),
2565            "second shutdown should error due to missing drain signal"
2566        );
2567    }
2568}