Skip to main content

slim_controller/
service.rs

1// Copyright AGNTCY Contributors (https://github.com/agntcy)
2// SPDX-License-Identifier: Apache-2.0
3
4use std::collections::{HashMap, VecDeque};
5use std::pin::Pin;
6use std::sync::Arc;
7
8use std::time::Duration;
9
10use display_error_chain::ErrorChainExt;
11use slim_config::server::ServerConfig;
12use slim_session::subscription_manager::SubscriptionManager;
13use tokio::sync::mpsc;
14use tokio::task::JoinHandle;
15use tokio_stream::{Stream, StreamExt, wrappers::ReceiverStream};
16use tokio_util::sync::CancellationToken;
17use tonic::{Request, Response, Status};
18use tracing::{debug, error, info};
19
20use crate::api::proto::api::v1::control_message::Payload;
21use crate::api::proto::api::v1::controller_service_server::ControllerServiceServer;
22use crate::api::proto::api::v1::{
23    self, ConnectionDetails, ConnectionDirection, ConnectionListResponse, ConnectionType,
24    RouteListResponse,
25};
26use crate::api::proto::api::v1::{
27    ConnectionEntry, ControlMessage, RouteEntry,
28    controller_service_client::ControllerServiceClient,
29    controller_service_server::ControllerService as GrpcControllerService,
30};
31use crate::errors::ControllerError;
32use prost_types::Struct;
33use slim_config::client::{ClientConfig, TransportChannel};
34use slim_datapath::api::{
35    MessageType::Subscribe, MessageType::SubscriptionAck as SubscriptionAckType,
36    MessageType::Unsubscribe, ProtoMessage as DataPlaneMessage,
37};
38use slim_datapath::api::{NameId, ProtoName};
39use slim_datapath::message_processing::MessageProcessor;
40use slim_datapath::messages::utils::SlimHeaderFlags;
41use slim_datapath::tables::SubscriptionTable;
42
43type TxChannel = mpsc::Sender<Result<ControlMessage, Status>>;
44type TxChannels = HashMap<String, TxChannel>;
45
46/// Maximum number of queued subscription notifications
47const MAX_QUEUED_NOTIFICATIONS: usize = 1000;
48
49/// Timeout for waiting on a subscription ack from the datapath.
50const SUBSCRIPTION_ACK_TIMEOUT: Duration = Duration::from_secs(30);
51
52/// Settings struct for creating a ControlPlane instance
53#[derive(Clone)]
54pub struct ControlPlaneSettings {
55    /// Node ID of this SLIM instance
56    pub id: String,
57    /// Optional group name
58    pub group_name: Option<String>,
59    /// Server configurations
60    pub servers: Vec<ServerConfig>,
61    /// Client configurations
62    pub clients: Vec<ClientConfig>,
63    /// Message processor instance
64    pub message_processor: MessageProcessor,
65    /// array of connection details used by the control
66    /// plane to store the connection settings (e.g., TLS settings).
67    pub connection_details: Vec<ConnectionDetails>,
68}
69
70/// Inner structure for the controller service
71/// This structure holds the internal state of the controller service,
72/// including the ID, message processor, connections, and channels.
73/// It is normally wrapped in an Arc to allow shared ownership across multiple threads.
74struct ControllerServiceInternal {
75    /// Node ID of this SLIM instance
76    id: String,
77
78    /// optional group name
79    group_name: Option<String>,
80
81    /// underlying message processor
82    message_processor: MessageProcessor,
83
84    /// channel to send messages into the datapath
85    tx_slim: mpsc::Sender<Result<DataPlaneMessage, Status>>,
86
87    /// channels to send control messages
88    tx_channels: parking_lot::RwLock<TxChannels>,
89
90    /// cancellation token for graceful shutdown
91    cancellation_tokens: parking_lot::RwLock<HashMap<String, CancellationToken>>,
92
93    /// drain watch channel
94    drain_watch: parking_lot::RwLock<Option<drain::Watch>>,
95
96    /// queue for pending subscription notifications when connections are down
97    pending_notifications: Arc<parking_lot::Mutex<VecDeque<ControlMessage>>>,
98
99    /// Manages pending subscription ack tracking (id generation, registration, resolution).
100    subscription_manager: SubscriptionManager,
101
102    /// connection details used by control plane to store connection settings
103    connection_details: Vec<ConnectionDetails>,
104
105    /// Maps (subscription_name, link_id) → subscription_id for route tracking
106    route_subscription_ids: parking_lot::Mutex<HashMap<(ProtoName, u64), u64>>,
107
108    /// Reverse index: link_id → conn_id for O(1) lookup.
109    /// Populated lazily on first resolve and eagerly on connection create/delete.
110    link_id_to_conn_id: parking_lot::RwLock<HashMap<String, u64>>,
111
112    /// JoinHandles for control-plane stream processing tasks, keyed by endpoint.
113    stream_handles: parking_lot::Mutex<HashMap<String, tokio::task::JoinHandle<()>>>,
114}
115
116#[derive(Clone)]
117struct ControllerService {
118    /// internal service state
119    inner: Arc<ControllerServiceInternal>,
120}
121
122/// The ControlPlane service is the main entry point for the controller service.
123pub struct ControlPlane {
124    /// servers
125    servers: Vec<ServerConfig>,
126
127    /// clients
128    clients: Vec<ClientConfig>,
129
130    /// drain signal channel
131    drain_signal: parking_lot::RwLock<Option<drain::Signal>>,
132
133    /// controller
134    controller: ControllerService,
135
136    /// channel to receive message from the datapath
137    /// to be used in listen_from_data_plane
138    rx_slim_option: Option<mpsc::Receiver<Result<DataPlaneMessage, Status>>>,
139}
140
141/// ControllerServiceInternal implements Drop trait to cancel all running listeners and
142/// clean up resources.
143impl Drop for ControlPlane {
144    fn drop(&mut self) {
145        // cancel all running listeners
146        for (_endpoint, token) in self.controller.inner.cancellation_tokens.write().drain() {
147            token.cancel();
148        }
149    }
150}
151
152pub(crate) fn from_server_config(server_config: &ServerConfig) -> ConnectionDetails {
153    let mut endpoint = server_config.endpoint.clone();
154    let mut external_endpoint = None;
155    let mut spire_socket_path = None;
156    let mut trust_domain = None;
157    let mut remaining_fields = std::collections::BTreeMap::new();
158
159    if let Some(m) = &server_config.metadata {
160        for (k, v) in &m.inner {
161            match k.as_str() {
162                "local_endpoint" => {
163                    if let Some(s) = v.as_str()
164                        && !s.is_empty()
165                    {
166                        endpoint = s.to_string();
167                    }
168                }
169                "external_endpoint" => {
170                    if let Some(s) = v.as_str()
171                        && !s.is_empty()
172                    {
173                        external_endpoint = Some(s.to_string());
174                    }
175                }
176                "spire_socket_path" => {
177                    if let Some(s) = v.as_str()
178                        && !s.is_empty()
179                    {
180                        spire_socket_path = Some(s.to_string());
181                    }
182                }
183                "trust_domain" => {
184                    if let Some(s) = v.as_str()
185                        && !s.is_empty()
186                    {
187                        trust_domain = Some(s.to_string());
188                    }
189                }
190                _ => {
191                    remaining_fields.insert(k.clone(), prost_types::Value::from(v));
192                }
193            }
194        }
195    }
196
197    let spire_mtls = if !server_config.tls_setting.insecure || spire_socket_path.is_some() {
198        Some(v1::connection_details::SpireMtls {
199            socket_path: spire_socket_path.unwrap_or_default(),
200            trust_domain,
201        })
202    } else {
203        None
204    };
205
206    let metadata = if remaining_fields.is_empty() {
207        None
208    } else {
209        Some(Struct {
210            fields: remaining_fields,
211        })
212    };
213
214    ConnectionDetails {
215        endpoint,
216        external_endpoint,
217        spire_mtls,
218        metadata,
219    }
220}
221
222/// ControlPlane implements the service trait for the controller service.
223impl ControlPlane {
224    /// Create a new ControlPlane service instance
225    /// This function initializes the ControlPlane with the given ID, servers, clients, and message processor.
226    /// It also sets up the internal state, including the connections and channels.
227    /// # Arguments
228    /// * `id` - The ID of the SLIM instance.
229    /// * `servers` - A vector of server configurations.
230    /// * `clients` - A vector of client configurations.
231    /// * `drain_rx` - A drain watch channel for graceful shutdown.
232    /// * `message_processor` - An Arc to the message processor instance.
233    /// * `pubsub_servers` - A slice of server configurations for pub/sub connections.
234    /// # Returns
235    /// A new instance of ControlPlane.
236    pub fn new(config: ControlPlaneSettings) -> Self {
237        // create local connection with the message processor
238        let (_, tx_slim, rx_slim) = config
239            .message_processor
240            .register_local_connection(true)
241            .unwrap();
242
243        let (signal, watch) = drain::channel();
244
245        ControlPlane {
246            servers: config.servers,
247            clients: config.clients,
248            controller: ControllerService {
249                inner: Arc::new(ControllerServiceInternal {
250                    id: config.id,
251                    group_name: config.group_name,
252                    message_processor: config.message_processor,
253                    subscription_manager: SubscriptionManager::new(tx_slim.clone()),
254                    tx_slim,
255                    tx_channels: parking_lot::RwLock::new(HashMap::new()),
256                    cancellation_tokens: parking_lot::RwLock::new(HashMap::new()),
257                    drain_watch: parking_lot::RwLock::new(Some(watch)),
258                    pending_notifications: Arc::new(parking_lot::Mutex::new(VecDeque::new())),
259                    connection_details: config.connection_details,
260                    route_subscription_ids: parking_lot::Mutex::new(HashMap::new()),
261                    link_id_to_conn_id: parking_lot::RwLock::new(HashMap::new()),
262                    stream_handles: parking_lot::Mutex::new(HashMap::new()),
263                }),
264            },
265            drain_signal: parking_lot::RwLock::new(Some(signal)),
266            rx_slim_option: Some(rx_slim),
267        }
268    }
269
270    /// Take an existing ControlPlane instance and return a new one with the provided clients.
271    pub fn with_clients(mut self, clients: Vec<ClientConfig>) -> Self {
272        self.clients = clients;
273        self
274    }
275
276    /// Take an existing ControlPlane instance and return a new one with the provided servers.
277    pub fn with_servers(mut self, servers: Vec<ServerConfig>) -> Self {
278        self.servers = servers;
279        self
280    }
281
282    /// Run the clients and servers of the ControlPlane service.
283    /// This function starts all the servers and clients defined in the ControlPlane.
284    /// # Returns
285    /// A Result indicating success or failure of the operation.
286    /// # Errors
287    /// If there is an error starting any of the servers or clients, it will return a ControllerError.
288    pub async fn run(&mut self) -> Result<(), ControllerError> {
289        let rx = self
290            .rx_slim_option
291            .take()
292            .ok_or(ControllerError::AlreadyStarted)?;
293
294        // Collect servers to avoid borrowing self both mutably and immutably
295        let servers = self.servers.clone();
296        let clients = self.clients.clone();
297
298        // run all servers
299        for server in servers {
300            self.run_server(server).await?;
301        }
302
303        // run all clients
304        for client in clients {
305            self.run_client(client).await?;
306        }
307
308        self.listen_from_data_plane(rx).await?;
309
310        Ok(())
311    }
312
313    pub async fn deregister(&self) -> Result<(), ControllerError> {
314        let node_id = self.controller.inner.id.clone();
315        let deregister_msg = ControlMessage {
316            message_id: uuid::Uuid::new_v4().to_string(),
317            payload: Some(Payload::DeregisterNodeRequest(v1::DeregisterNodeRequest {
318                node: Some(v1::Node { id: node_id }),
319            })),
320        };
321        let channels: Vec<(String, TxChannel)> = self
322            .controller
323            .inner
324            .tx_channels
325            .read()
326            .iter()
327            .map(|(ep, tx)| (ep.clone(), tx.clone()))
328            .collect();
329        for (endpoint, tx) in channels {
330            if let Err(e) = tx.send(Ok(deregister_msg.clone())).await {
331                error!(%endpoint, error = %e, "failed to send deregister request");
332            }
333        }
334        Ok(())
335    }
336
337    pub async fn shutdown(&self) -> Result<(), ControllerError> {
338        // Get signal drain
339        let signal = self
340            .drain_signal
341            .write()
342            .take()
343            .ok_or(ControllerError::AlreadyStopped)?;
344
345        // Stop everything using the cancellation tokens
346        self.controller
347            .inner
348            .cancellation_tokens
349            .write()
350            .drain()
351            .for_each(|(endpoint, token)| {
352                info!(%endpoint, "stopping");
353                token.cancel();
354            });
355
356        // Drop watch channel
357        self.controller.inner.drain_watch.write().take();
358
359        // Wait for drain to complete
360        signal.drain().await;
361
362        Ok(())
363    }
364
365    async fn listen_from_data_plane(
366        &mut self,
367        mut rx: mpsc::Receiver<Result<DataPlaneMessage, Status>>,
368    ) -> Result<(), ControllerError> {
369        let cancellation_token = CancellationToken::new();
370        let cancellation_token_clone = cancellation_token.clone();
371
372        self.controller
373            .inner
374            .cancellation_tokens
375            .write()
376            .insert("DATA_PLANE".to_string(), cancellation_token_clone);
377
378        let clients = self.clients.clone();
379        let controller = self.controller.clone();
380
381        // Get a drain watch clone
382        let watch = self.controller.drain_watch()?;
383
384        debug!("Starting data plane listener");
385        tokio::spawn(async move {
386            let mut drain_fut = std::pin::pin!(watch.signaled());
387            loop {
388                tokio::select! {
389                    next = rx.recv() => {
390                        match next {
391                            Some(res) => {
392                                match res {
393                                    Ok(msg) => {
394                                        debug!("Send sub/unsub to control plane for message: {:?}", msg);
395                                        match msg.get_type() {
396                                            Subscribe(_) => {
397                                                controller.handle_subscribe_message(msg.get_dst(), &clients).await;
398                                            }
399                                            Unsubscribe(_) => {
400                                                controller.handle_unsubscribe_message(msg.get_dst(), &clients).await;
401                                            }
402                                            SubscriptionAckType(_) => {
403                                                controller.inner.subscription_manager.resolve_ack(msg.get_subscription_ack());
404                                            }
405                                            _ => {
406                                                debug!("Ignoring unexpected message type from dataplane: {:?}", msg.get_type());
407                                            }
408                                        }
409                                    }
410                                    Err(e) => {
411                                        error!(error = %e.chain(), "received error from the data plane");
412                                        continue;
413                                    }
414                                }
415                            }
416                            None => {
417                                debug!("Data plane receiver channel closed.");
418                                break;
419                            }
420                        }
421                    }
422                    _ = cancellation_token.cancelled() => {
423                        debug!("shutting down stream on cancellation token");
424                        break;
425                    }
426                    _ = &mut drain_fut => {
427                        debug!("shutting down stream on drain");
428                        break;
429                    }
430                }
431            }
432        });
433        Ok(())
434    }
435
436    /// Stop the ControlPlane service.
437    /// This function stops all running listeners and cancels any ongoing operations.
438    /// It cleans up the internal state and ensures that all resources are released properly.
439    pub fn stop(&mut self) {
440        info!("stopping controller service");
441
442        // cancel all running listeners
443        for (endpoint, token) in self.controller.inner.cancellation_tokens.write().drain() {
444            info!(%endpoint, "stopping");
445            token.cancel();
446        }
447    }
448
449    /// Run a client configuration.
450    /// This function connects to the control plane using the provided client configuration.
451    /// It checks if the client is already running and if not, it starts a new connection.
452    async fn run_client(&mut self, client: ClientConfig) -> Result<(), ControllerError> {
453        if self
454            .controller
455            .inner
456            .cancellation_tokens
457            .read()
458            .contains_key(&client.endpoint)
459        {
460            return Err(ControllerError::ClientAlreadyRunning(client.endpoint));
461        }
462
463        let cancellation_token = CancellationToken::new();
464
465        let tx = self
466            .controller
467            .connect(client.clone(), cancellation_token.clone())
468            .await?;
469
470        // Store the cancellation token in the controller service
471        self.controller
472            .inner
473            .cancellation_tokens
474            .write()
475            .insert(client.endpoint.clone(), cancellation_token);
476
477        // Store the sender in the tx_channels map
478        self.controller
479            .inner
480            .tx_channels
481            .write()
482            .insert(client.endpoint.clone(), tx);
483
484        // return the sender for control messages
485        Ok(())
486    }
487
488    /// Run a server configuration.
489    /// This function starts a server using the provided server configuration.
490    /// It checks if the server is already running and if not, it starts a new server.
491    pub async fn run_server(&mut self, config: ServerConfig) -> Result<(), ControllerError> {
492        // Check if the server is already running
493        if self
494            .controller
495            .inner
496            .cancellation_tokens
497            .read()
498            .contains_key(&config.endpoint)
499        {
500            error!(endpoint = config.endpoint, "server is already running",);
501            return Err(ControllerError::ServerAlreadyRunning(config.endpoint));
502        }
503
504        let token = config
505            .run_grpc_server(
506                &[ControllerServiceServer::new(self.controller.clone())],
507                self.controller.drain_watch()?,
508            )
509            .await?;
510
511        // Store the cancellation token in the controller service
512        self.controller
513            .inner
514            .cancellation_tokens
515            .write()
516            .insert(config.endpoint.clone(), token.clone());
517
518        info!(%config.endpoint, "started controlplane server");
519
520        Ok(())
521    }
522}
523
524impl ControllerService {
525    fn resolve_connection_by_link_id(&self, link_id: &str) -> Result<Option<u64>, String> {
526        let cached = self.inner.link_id_to_conn_id.read().get(link_id).copied();
527        if let Some(conn_id) = cached {
528            if self
529                .inner
530                .message_processor
531                .connection_table()
532                .get(conn_id)
533                .is_some_and(|conn| conn.link_id().as_deref() == Some(link_id))
534            {
535                return Ok(Some(conn_id));
536            }
537            self.inner.link_id_to_conn_id.write().remove(link_id);
538        }
539
540        let mut resolved: Option<u64> = None;
541        self.inner
542            .message_processor
543            .connection_table()
544            .for_each(|id, conn| {
545                if conn.link_id().as_deref() == Some(link_id) && resolved.is_none() {
546                    resolved = Some(id);
547                }
548            });
549
550        if let Some(conn_id) = resolved {
551            self.inner
552                .link_id_to_conn_id
553                .write()
554                .insert(link_id.to_string(), conn_id);
555        }
556
557        Ok(resolved)
558    }
559
560    fn disconnect_connection_by_link_id(&self, link_id: &str) -> Result<(), String> {
561        if link_id.trim().is_empty() {
562            return Err("link_id cannot be empty".to_string());
563        }
564
565        let conn_id = match self.resolve_connection_by_link_id(link_id)? {
566            Some(id) => id,
567            None => {
568                return Err(format!("Connection with link_id {} not found", link_id));
569            }
570        };
571
572        if let Err(e) = self.inner.message_processor.disconnect(conn_id) {
573            // Best-effort delete: local/control-plane connections can lack config_data.
574            info!(
575                link_id = %link_id,
576                conn_id,
577                error = %e,
578                "Disconnect returned an error; continuing delete flow"
579            );
580        }
581
582        self.inner.link_id_to_conn_id.write().remove(link_id);
583        self.inner
584            .route_subscription_ids
585            .lock()
586            .retain(|(_name, cid), _| *cid != conn_id);
587
588        info!(link_id = %link_id, conn_id, "Successfully deleted connection by link_id");
589        Ok(())
590    }
591
592    fn resolve_route_connection(&self, route: &v1::Route) -> Result<Option<u64>, String> {
593        if let Some(link_id) = &route.link_id {
594            let trimmed = link_id.trim();
595            if !trimmed.is_empty() {
596                return self.resolve_connection_by_link_id(trimmed);
597            }
598        }
599
600        Ok(None)
601    }
602
603    async fn diff_connections(
604        &self,
605        desired_connections: &[v1::Connection],
606    ) -> Vec<v1::ConnectionAck> {
607        let mut connections_status = Vec::new();
608
609        let desired_link_ids: std::collections::HashSet<String> = desired_connections
610            .iter()
611            .map(|c| c.link_id.clone())
612            .collect();
613
614        let mut live_outgoing_link_ids: Vec<String> = Vec::new();
615        self.inner
616            .message_processor
617            .connection_table()
618            .for_each(|_id, conn| {
619                if conn.is_outgoing()
620                    && let Some(lid) = conn.link_id()
621                    && !lid.is_empty()
622                {
623                    live_outgoing_link_ids.push(lid);
624                }
625            });
626
627        for link_id in &live_outgoing_link_ids {
628            if desired_link_ids.contains(link_id) {
629                continue;
630            }
631            info!(link_id = %link_id, "desired state: removing connection");
632            let mut success = true;
633            let mut error_msg = String::new();
634            if let Err(err) = self.disconnect_connection_by_link_id(link_id) {
635                success = false;
636                error_msg = err;
637            }
638            connections_status.push(v1::ConnectionAck {
639                link_id: link_id.clone(),
640                success,
641                error_msg,
642            });
643        }
644
645        for conn in desired_connections {
646            let link_id = &conn.link_id;
647            if link_id.is_empty() {
648                continue;
649            }
650
651            let already_exists = match self.resolve_connection_by_link_id(link_id) {
652                Ok(Some(_)) => true,
653                Ok(None) => false,
654                Err(err) => {
655                    connections_status.push(v1::ConnectionAck {
656                        link_id: link_id.clone(),
657                        success: false,
658                        error_msg: err,
659                    });
660                    continue;
661                }
662            };
663
664            if already_exists {
665                connections_status.push(v1::ConnectionAck {
666                    link_id: link_id.clone(),
667                    success: true,
668                    error_msg: String::new(),
669                });
670                continue;
671            }
672
673            info!(?conn, "desired state: creating connection");
674            let mut success = true;
675            let mut error_msg = String::new();
676
677            match serde_json::from_str::<ClientConfig>(&conn.config_data) {
678                Err(e) => {
679                    success = false;
680                    error_msg = format!("Failed to parse config: {}", e);
681                }
682                Ok(client_config) => {
683                    match self
684                        .inner
685                        .message_processor
686                        .connect(client_config.clone(), None, None)
687                        .await
688                    {
689                        Err(e) => {
690                            success = false;
691                            error_msg = format!("Connection failed: {}", e);
692                        }
693                        Ok(conn_id) => {
694                            let requested_link_id = if client_config.link_id.trim().is_empty() {
695                                String::new()
696                            } else {
697                                client_config.link_id.clone()
698                            };
699                            if !requested_link_id.is_empty() {
700                                self.inner
701                                    .link_id_to_conn_id
702                                    .write()
703                                    .insert(requested_link_id, conn_id.1);
704                            }
705                            info!(
706                                link_id = %link_id,
707                                "Successfully created connection"
708                            );
709                        }
710                    }
711                }
712            }
713
714            connections_status.push(v1::ConnectionAck {
715                link_id: link_id.clone(),
716                success,
717                error_msg,
718            });
719        }
720
721        connections_status
722    }
723
724    fn resolve_desired_routes<'a>(
725        &self,
726        desired_routes: &'a [v1::Route],
727    ) -> (HashMap<(ProtoName, u64), &'a v1::Route>, Vec<v1::RouteAck>) {
728        type SubKey = (ProtoName, u64);
729        let mut desired_subs: HashMap<SubKey, &v1::Route> = HashMap::new();
730        let mut failures: Vec<v1::RouteAck> = Vec::new();
731
732        for sub in desired_routes {
733            match self.resolve_route_connection(sub) {
734                Ok(Some(conn_id)) => {
735                    let name = sub.name.clone().unwrap();
736                    desired_subs.insert((name, conn_id), sub);
737                }
738                Ok(None) => {
739                    failures.push(v1::RouteAck {
740                        route: Some(sub.clone()),
741                        success: false,
742                        error_msg: "connection not found".to_string(),
743                    });
744                }
745                Err(err) => {
746                    failures.push(v1::RouteAck {
747                        route: Some(sub.clone()),
748                        success: false,
749                        error_msg: err,
750                    });
751                }
752            }
753        }
754
755        (desired_subs, failures)
756    }
757
758    async fn delete_stale_subscriptions(
759        &self,
760        desired_subs: &HashMap<(ProtoName, u64), &v1::Route>,
761    ) -> Vec<v1::RouteAck> {
762        let active_subs: Vec<((ProtoName, u64), u64)> = self
763            .inner
764            .route_subscription_ids
765            .lock()
766            .iter()
767            .map(|((name, conn_id), sub_id)| ((name.clone(), *conn_id), *sub_id))
768            .collect();
769
770        let stale: Vec<_> = active_subs
771            .into_iter()
772            .filter(|((name, conn_id), _)| !desired_subs.contains_key(&(name.clone(), *conn_id)))
773            .collect();
774
775        let futs = stale.iter().map(|((name, conn_id), sub_id)| {
776            let name = name.clone();
777            let conn_id = *conn_id;
778            let sub_id = *sub_id;
779            async move {
780                let conn_alive = self
781                    .inner
782                    .message_processor
783                    .connection_table()
784                    .get(conn_id)
785                    .is_some();
786
787                let (success, error_msg) = if conn_alive {
788                    let source = name.clone().with_id(0);
789                    let unsub_msg = DataPlaneMessage::builder()
790                        .source(source)
791                        .destination(name.clone())
792                        .identity("")
793                        .flags(SlimHeaderFlags::default().with_recv_from(conn_id))
794                        .build_unsubscribe()
795                        .unwrap();
796
797                    match self
798                        .send_unsubscribe_message_with_ack(unsub_msg, sub_id)
799                        .await
800                    {
801                        Ok(()) => (true, String::new()),
802                        Err(err) => (false, format!("Failed to unsubscribe: {}", err)),
803                    }
804                } else {
805                    (true, String::new())
806                };
807
808                (name, conn_id, success, error_msg)
809            }
810        });
811
812        let results = futures::future::join_all(futs).await;
813
814        let mut routes_status = Vec::with_capacity(results.len());
815        for (name, conn_id, success, error_msg) in results {
816            if success {
817                self.inner
818                    .route_subscription_ids
819                    .lock()
820                    .remove(&(name.clone(), conn_id));
821            }
822
823            routes_status.push(v1::RouteAck {
824                route: Some(v1::Route {
825                    name: Some(name.clone()),
826                    link_id: None,
827                    direction: None,
828                }),
829                success,
830                error_msg,
831            });
832        }
833
834        routes_status
835    }
836
837    async fn create_new_subscriptions(
838        &self,
839        desired_subs: &HashMap<(ProtoName, u64), &v1::Route>,
840    ) -> Vec<v1::RouteAck> {
841        let mut routes_status = Vec::new();
842
843        let to_create: Vec<((ProtoName, u64), v1::Route)> = desired_subs
844            .iter()
845            .filter(|((name, conn_id), _)| {
846                !self
847                    .inner
848                    .route_subscription_ids
849                    .lock()
850                    .contains_key(&(name.clone(), *conn_id))
851            })
852            .map(|((name, conn_id), sub)| ((name.clone(), *conn_id), (*sub).clone()))
853            .collect();
854
855        // Report already-active subscriptions as success.
856        for ((name, conn_id), sub) in desired_subs {
857            let dominated = to_create
858                .iter()
859                .any(|((n, c), _)| n == name && *c == *conn_id);
860            if !dominated {
861                routes_status.push(v1::RouteAck {
862                    route: Some((*sub).clone()),
863                    success: true,
864                    error_msg: String::new(),
865                });
866            }
867        }
868
869        let futs = to_create.iter().map(|((name, conn_id), sub)| {
870            let name = name.clone();
871            let conn_id = *conn_id;
872            let sub = sub.clone();
873            async move {
874                let source = name.clone().with_id(0);
875                let sub_msg = DataPlaneMessage::builder()
876                    .source(source)
877                    .destination(name.clone())
878                    .identity("")
879                    .flags(SlimHeaderFlags::default().with_recv_from(conn_id))
880                    .build_subscribe()
881                    .unwrap();
882
883                let result = self.send_subscribe_message_with_ack(sub_msg).await;
884                (name, conn_id, sub, result)
885            }
886        });
887
888        let results = futures::future::join_all(futs).await;
889
890        for (name, conn_id, sub, result) in results {
891            let (success, error_msg) = match result {
892                Ok(subscription_id) => {
893                    self.inner
894                        .route_subscription_ids
895                        .lock()
896                        .insert((name, conn_id), subscription_id);
897                    info!(?sub, "desired state: created route");
898                    (true, String::new())
899                }
900                Err(err) => (false, format!("Failed to subscribe: {}", err)),
901            };
902
903            routes_status.push(v1::RouteAck {
904                route: Some(sub),
905                success,
906                error_msg,
907            });
908        }
909
910        routes_status
911    }
912
913    /// Handle new control messages.
914    async fn handle_new_control_message(
915        &self,
916        msg: ControlMessage,
917        tx: &mpsc::Sender<Result<ControlMessage, Status>>,
918    ) -> Result<(), ControllerError> {
919        match msg.payload {
920            Some(ref payload) => {
921                match payload {
922                    Payload::ConfigCommand(config) => {
923                        let (connections_status, routes_status) = if config.reconcile {
924                            let connections_status =
925                                self.diff_connections(&config.connections_to_create).await;
926                            let (desired_subs, mut routes_status) =
927                                self.resolve_desired_routes(&config.routes_to_set);
928                            routes_status
929                                .extend(self.delete_stale_subscriptions(&desired_subs).await);
930                            routes_status
931                                .extend(self.create_new_subscriptions(&desired_subs).await);
932                            (connections_status, routes_status)
933                        } else {
934                            let mut connections_status = Vec::new();
935                            let mut routes_status = Vec::new();
936
937                            // Process connections to delete by link_id.
938                            for link_id in &config.connections_to_delete {
939                                info!(link_id = %link_id, "received a connection to delete");
940                                let mut connection_success = true;
941                                let mut connection_error_msg = String::new();
942
943                                if let Err(err) = self.disconnect_connection_by_link_id(link_id) {
944                                    connection_success = false;
945                                    connection_error_msg = err;
946                                }
947
948                                connections_status.push(v1::ConnectionAck {
949                                    link_id: link_id.clone(),
950                                    success: connection_success,
951                                    error_msg: connection_error_msg,
952                                });
953                            }
954
955                            // Process connections to create
956                            for conn in &config.connections_to_create {
957                                info!(?conn, "received a connection to create");
958                                let mut connection_success = true;
959                                let mut connection_error_msg = String::new();
960
961                                match serde_json::from_str::<ClientConfig>(&conn.config_data) {
962                                    Err(e) => {
963                                        connection_success = false;
964                                        connection_error_msg =
965                                            format!("Failed to parse config: {}", e);
966                                    }
967                                    Ok(client_config) => {
968                                        let client_endpoint = &client_config.endpoint;
969                                        let requested_link_id =
970                                            if client_config.link_id.trim().is_empty() {
971                                                String::new()
972                                            } else {
973                                                client_config.link_id.clone()
974                                            };
975                                        let mut existing_conn_for_link_id = false;
976
977                                        if !requested_link_id.is_empty() {
978                                            match self
979                                                .resolve_connection_by_link_id(&requested_link_id)
980                                            {
981                                                Err(err) => {
982                                                    connection_success = false;
983                                                    connection_error_msg = err;
984                                                }
985                                                Ok(Some(conn_id)) => {
986                                                    existing_conn_for_link_id = true;
987                                                    self.inner
988                                                        .link_id_to_conn_id
989                                                        .write()
990                                                        .insert(requested_link_id.clone(), conn_id);
991                                                    info!(
992                                                        link_id = %requested_link_id,
993                                                        conn_id,
994                                                        "Connection already exists for link_id"
995                                                    );
996                                                }
997                                                Ok(None) => {}
998                                            }
999                                        }
1000
1001                                        if connection_success && !existing_conn_for_link_id {
1002                                            match self
1003                                                .inner
1004                                                .message_processor
1005                                                .connect(client_config.clone(), None, None)
1006                                                .await
1007                                            {
1008                                                Err(e) => {
1009                                                    connection_success = false;
1010                                                    connection_error_msg =
1011                                                        format!("Connection failed: {}", e);
1012                                                }
1013                                                Ok(conn_id) => {
1014                                                    if !requested_link_id.is_empty() {
1015                                                        self.inner
1016                                                            .link_id_to_conn_id
1017                                                            .write()
1018                                                            .insert(
1019                                                                requested_link_id.clone(),
1020                                                                conn_id.1,
1021                                                            );
1022                                                    }
1023                                                    info!(
1024                                                        endpoint = %client_endpoint, "Successfully created connection",
1025                                                    );
1026                                                }
1027                                            }
1028                                        }
1029                                    }
1030                                }
1031
1032                                // Add connection status
1033                                connections_status.push(v1::ConnectionAck {
1034                                    link_id: conn.link_id.clone(),
1035                                    success: connection_success,
1036                                    error_msg: connection_error_msg,
1037                                });
1038                            }
1039
1040                            // Process routes to set
1041                            for route in &config.routes_to_set {
1042                                let mut route_success = true;
1043                                let mut route_error_msg = String::new();
1044
1045                                let conn = self.resolve_route_connection(route);
1046
1047                                if let Ok(Some(conn)) = conn {
1048                                    let name = route.name.clone().unwrap();
1049                                    let source = name.clone().with_id(NameId::NULL_COMPONENT);
1050
1051                                    let msg = DataPlaneMessage::builder()
1052                                        .source(source.clone())
1053                                        .destination(name.clone())
1054                                        .identity("")
1055                                        .flags(SlimHeaderFlags::default().with_recv_from(conn))
1056                                        .build_subscribe()
1057                                        .unwrap();
1058
1059                                    match self.send_subscribe_message_with_ack(msg).await {
1060                                        Ok(subscription_id) => {
1061                                            // Store the subscription_id for later unsubscription
1062                                            self.inner
1063                                                .route_subscription_ids
1064                                                .lock()
1065                                                .insert((name.clone(), conn), subscription_id);
1066                                            info!(?route, "Successfully created route");
1067                                        }
1068                                        Err(err) => {
1069                                            route_success = false;
1070                                            route_error_msg =
1071                                                format!("Failed to subscribe: {}", err);
1072                                        }
1073                                    }
1074                                } else {
1075                                    route_success = false;
1076                                    route_error_msg = match conn {
1077                                        Ok(None) => {
1078                                            format!(
1079                                                "Connection with link_id {} not found",
1080                                                route.link_id.as_deref().unwrap_or("<none>")
1081                                            )
1082                                        }
1083                                        Err(err) => err,
1084                                        _ => "unknown connection lookup error".to_string(),
1085                                    };
1086                                }
1087
1088                                // Add route status
1089                                routes_status.push(v1::RouteAck {
1090                                    route: Some(route.clone()),
1091                                    success: route_success,
1092                                    error_msg: route_error_msg,
1093                                });
1094                            }
1095
1096                            // Process routes to delete
1097                            for route in &config.routes_to_delete {
1098                                let mut route_success = true;
1099                                let mut route_error_msg = String::new();
1100
1101                                let conn = self.resolve_route_connection(route);
1102
1103                                if let Ok(Some(conn)) = conn {
1104                                    let name = route.name.clone().unwrap();
1105                                    let source = name.clone().with_id(NameId::NULL_COMPONENT);
1106
1107                                    let msg = DataPlaneMessage::builder()
1108                                        .source(source.clone())
1109                                        .destination(name.clone())
1110                                        .identity("")
1111                                        .flags(SlimHeaderFlags::default().with_recv_from(conn))
1112                                        .build_unsubscribe()
1113                                        .unwrap();
1114
1115                                    let sub_id = self
1116                                        .inner
1117                                        .route_subscription_ids
1118                                        .lock()
1119                                        .remove(&(name.clone(), conn));
1120                                    let unsubscribe_result = match sub_id {
1121                                        Some(subscription_id) => {
1122                                            self.send_unsubscribe_message_with_ack(
1123                                                msg,
1124                                                subscription_id,
1125                                            )
1126                                            .await
1127                                        }
1128                                        None => {
1129                                            // No tracking ID in the map (e.g. after a CP restart
1130                                            // or for orphan cleanup from the reconciler).  Generate
1131                                            // a fresh ID — the datapath locates the subscription
1132                                            // by Name+connection, not by ID; the ID is only used
1133                                            // for ack correlation.
1134                                            let (ack_id, ack_rx) =
1135                                                self.inner.subscription_manager.register_ack();
1136                                            let mut fresh_msg = msg;
1137                                            fresh_msg.set_subscription_id(ack_id);
1138                                            if let Err(e) =
1139                                                self.send_control_message(fresh_msg).await
1140                                            {
1141                                                self.inner.subscription_manager.cancel_ack(ack_id);
1142                                                Err(format!("datapath send error: {}", e.chain()))
1143                                            } else {
1144                                                match tokio::time::timeout(
1145                                                    SUBSCRIPTION_ACK_TIMEOUT,
1146                                                    ack_rx,
1147                                                )
1148                                                .await
1149                                                {
1150                                                    Ok(Ok(Ok(()))) => Ok(()),
1151                                                    Ok(Ok(Err(err))) => Err(err.to_string()),
1152                                                    Ok(Err(_)) => {
1153                                                        Err("subscription ack channel closed"
1154                                                            .to_string())
1155                                                    }
1156                                                    Err(_) => {
1157                                                        self.inner
1158                                                            .subscription_manager
1159                                                            .cancel_ack(ack_id);
1160                                                        Err("subscription ack timed out"
1161                                                            .to_string())
1162                                                    }
1163                                                }
1164                                            }
1165                                        }
1166                                    };
1167                                    if let Err(err) = unsubscribe_result {
1168                                        route_success = false;
1169                                        route_error_msg = format!("Failed to unsubscribe: {}", err);
1170                                    } else {
1171                                        info!(?route, "Successfully deleted route");
1172                                    }
1173                                } else {
1174                                    route_success = false;
1175                                    route_error_msg = match conn {
1176                                        Ok(None) => {
1177                                            format!(
1178                                                "Connection with link_id {} not found",
1179                                                route.link_id.as_deref().unwrap_or("<none>")
1180                                            )
1181                                        }
1182                                        Err(err) => err,
1183                                        _ => "unknown connection lookup error".to_string(),
1184                                    };
1185                                }
1186
1187                                // Add route status (for deletion)
1188                                routes_status.push(v1::RouteAck {
1189                                    route: Some(route.clone()),
1190                                    success: route_success,
1191                                    error_msg: route_error_msg,
1192                                });
1193                            }
1194
1195                            (connections_status, routes_status)
1196                        };
1197
1198                        let config_ack = v1::ConfigurationCommandAck {
1199                            original_message_id: msg.message_id.clone(),
1200                            connections_status,
1201                            routes_status,
1202                        };
1203
1204                        let reply = ControlMessage {
1205                            message_id: uuid::Uuid::new_v4().to_string(),
1206                            payload: Some(Payload::ConfigCommandAck(config_ack)),
1207                        };
1208
1209                        if let Err(e) = tx.send(Ok(reply)).await {
1210                            error!(error = %e.chain(), "failed to send ConfigurationCommandAck");
1211                        }
1212
1213                        info!(
1214                            connections = %config.connections_to_create.len(),
1215                            connections_to_delete = %config.connections_to_delete.len(),
1216                            routes_to_set = %config.routes_to_set.len(),
1217                            routes_to_del = %config.routes_to_delete.len(),
1218                            "Processed ConfigurationCommand"
1219                        );
1220                    }
1221                    Payload::RouteListRequest(_) => {
1222                        const CHUNK_SIZE: usize = 100;
1223
1224                        let conn_table = self.inner.message_processor.connection_table();
1225                        let mut entries = Vec::new();
1226
1227                        self.inner.message_processor.subscription_table().for_each(
1228                            |name, id, local, remote, peer| {
1229                                let mut entry = RouteEntry {
1230                                    name: Some(name.clone().with_id(id)),
1231                                    ..Default::default()
1232                                };
1233
1234                                for &cid in local {
1235                                    entry.connections.push(ConnectionEntry {
1236                                        id: cid,
1237                                        connection_type: ConnectionType::Local as i32,
1238                                        config_data: "{}".to_string(),
1239                                        link_id: None,
1240                                        direction: ConnectionDirection::Outgoing as i32,
1241                                        peer_node_id: None,
1242                                    });
1243                                }
1244
1245                                for &cid in remote {
1246                                    if let Some(conn) = conn_table.get(cid) {
1247                                        entry.connections.push(ConnectionEntry {
1248                                            id: cid,
1249                                            connection_type: ConnectionType::Remote as i32,
1250                                            config_data: match conn.config_data() {
1251                                                Some(data) => serde_json::to_string(data)
1252                                                    .unwrap_or_else(|_| "{}".to_string()),
1253                                                None => "{}".to_string(),
1254                                            },
1255                                            link_id: conn.link_id(),
1256                                            direction: if conn.is_outgoing() {
1257                                                ConnectionDirection::Outgoing as i32
1258                                            } else {
1259                                                ConnectionDirection::Incoming as i32
1260                                            },
1261                                            peer_node_id: conn.peer_node_id().map(str::to_string),
1262                                        });
1263                                    } else {
1264                                        error!(%cid, "no connection entry for id");
1265                                    }
1266                                }
1267
1268                                for &cid in peer {
1269                                    if let Some(conn) = conn_table.get(cid) {
1270                                        entry.connections.push(ConnectionEntry {
1271                                            id: cid,
1272                                            connection_type: ConnectionType::Peer as i32,
1273                                            config_data: "{}".to_string(),
1274                                            link_id: conn.link_id(),
1275                                            direction: if conn.is_outgoing() {
1276                                                ConnectionDirection::Outgoing as i32
1277                                            } else {
1278                                                ConnectionDirection::Incoming as i32
1279                                            },
1280                                            peer_node_id: conn.peer_node_id().map(str::to_string),
1281                                        });
1282                                    } else {
1283                                        error!(%cid, "no connection entry for id (peer)");
1284                                    }
1285                                }
1286                                entries.push(entry);
1287                            },
1288                        );
1289
1290                        let chunks: Vec<_> = entries.chunks(CHUNK_SIZE).collect();
1291                        if chunks.is_empty() {
1292                            let resp = ControlMessage {
1293                                message_id: uuid::Uuid::new_v4().to_string(),
1294                                payload: Some(Payload::RouteListResponse(RouteListResponse {
1295                                    original_message_id: msg.message_id.clone(),
1296                                    entries: vec![],
1297                                    done: true,
1298                                })),
1299                            };
1300                            if let Err(e) = tx.send(Ok(resp)).await {
1301                                error!(error = %e.chain(), "failed to send route list response");
1302                            }
1303                        } else {
1304                            let n = chunks.len();
1305                            for (i, chunk) in chunks.into_iter().enumerate() {
1306                                let resp = ControlMessage {
1307                                    message_id: uuid::Uuid::new_v4().to_string(),
1308                                    payload: Some(Payload::RouteListResponse(RouteListResponse {
1309                                        original_message_id: msg.message_id.clone(),
1310                                        entries: chunk.to_vec(),
1311                                        done: i + 1 == n,
1312                                    })),
1313                                };
1314                                if let Err(e) = tx.send(Ok(resp)).await {
1315                                    error!(error = %e.chain(), "failed to send route batch");
1316                                    break;
1317                                }
1318                            }
1319                        }
1320                    }
1321                    Payload::ConnectionListRequest(_) => {
1322                        let mut all_entries = Vec::new();
1323                        self.inner
1324                            .message_processor
1325                            .connection_table()
1326                            .for_each(|id, conn| {
1327                                debug!(
1328                                    conn_id = id,
1329                                    local_addr = ?conn.local_addr(),
1330                                    remote_addr = ?conn.remote_addr(),
1331                                    is_outgoing = conn.is_outgoing(),
1332                                    link_id = ?conn.link_id(),
1333                                    "connection entry",
1334                                );
1335                                all_entries.push(ConnectionEntry {
1336                                    id,
1337                                    connection_type: ConnectionType::Remote as i32,
1338                                    config_data: match conn.config_data() {
1339                                        Some(data) => serde_json::to_string(data)
1340                                            .unwrap_or_else(|_| "{}".to_string()),
1341                                        None => "{}".to_string(),
1342                                    },
1343                                    link_id: conn.link_id(),
1344                                    direction: if conn.is_outgoing() {
1345                                        ConnectionDirection::Outgoing as i32
1346                                    } else {
1347                                        ConnectionDirection::Incoming as i32
1348                                    },
1349                                    peer_node_id: conn.peer_node_id().map(str::to_string),
1350                                });
1351                            });
1352
1353                        const CHUNK_SIZE: usize = 100;
1354                        let chunks: Vec<_> = all_entries.chunks(CHUNK_SIZE).collect();
1355                        if chunks.is_empty() {
1356                            let resp = ControlMessage {
1357                                message_id: uuid::Uuid::new_v4().to_string(),
1358                                payload: Some(Payload::ConnectionListResponse(
1359                                    ConnectionListResponse {
1360                                        original_message_id: msg.message_id.clone(),
1361                                        entries: vec![],
1362                                        done: true,
1363                                    },
1364                                )),
1365                            };
1366                            if let Err(e) = tx.send(Ok(resp)).await {
1367                                error!(error = %e.chain(), "failed to send connection list response");
1368                            }
1369                        } else {
1370                            let n = chunks.len();
1371                            for (i, chunk) in chunks.into_iter().enumerate() {
1372                                let resp = ControlMessage {
1373                                    message_id: uuid::Uuid::new_v4().to_string(),
1374                                    payload: Some(Payload::ConnectionListResponse(
1375                                        ConnectionListResponse {
1376                                            original_message_id: msg.message_id.clone(),
1377                                            entries: chunk.to_vec(),
1378                                            done: i + 1 == n,
1379                                        },
1380                                    )),
1381                                };
1382                                if let Err(e) = tx.send(Ok(resp)).await {
1383                                    error!(error = %e.chain(), "failed to send connection list batch");
1384                                    break;
1385                                }
1386                            }
1387                        }
1388                    }
1389                    Payload::RegisterNodeRequest(_) => {
1390                        error!("received a register node request");
1391                    }
1392                    Payload::DeregisterNodeRequest(_) => {
1393                        error!("received a deregister node request");
1394                    }
1395                    _ => {
1396                        // received unsupported message type, do nothing
1397                        debug!("received unsupported message type from control - ignoring");
1398                    }
1399                }
1400            }
1401            None => {
1402                error!(
1403                    message_id = %msg.message_id,
1404                    "received control message with no payload",
1405                );
1406            }
1407        }
1408
1409        Ok(())
1410    }
1411
1412    async fn handle_subscribe_message(&self, dst: ProtoName, clients: &[ClientConfig]) {
1413        let mut sub_vec = vec![];
1414
1415        let cmd = v1::Route {
1416            name: Some(dst),
1417            link_id: None,
1418            direction: None,
1419        };
1420
1421        sub_vec.push(cmd);
1422
1423        let ctrl = ControlMessage {
1424            message_id: uuid::Uuid::new_v4().to_string(),
1425            payload: Some(Payload::ConfigCommand(v1::ConfigurationCommand {
1426                connections_to_create: vec![],
1427                connections_to_delete: vec![],
1428                routes_to_set: sub_vec,
1429                routes_to_delete: vec![],
1430                reconcile: false,
1431            })),
1432        };
1433
1434        return self.send_or_queue_notification(ctrl, clients).await;
1435    }
1436
1437    async fn handle_unsubscribe_message(&self, dst: ProtoName, clients: &[ClientConfig]) {
1438        let mut unsub_vec = vec![];
1439
1440        let cmd = v1::Route {
1441            name: Some(dst),
1442            link_id: None,
1443            direction: None,
1444        };
1445
1446        unsub_vec.push(cmd);
1447
1448        let ctrl = ControlMessage {
1449            message_id: uuid::Uuid::new_v4().to_string(),
1450            payload: Some(Payload::ConfigCommand(v1::ConfigurationCommand {
1451                connections_to_create: vec![],
1452                connections_to_delete: vec![],
1453                routes_to_set: vec![],
1454                routes_to_delete: unsub_vec,
1455                reconcile: false,
1456            })),
1457        };
1458
1459        return self.send_or_queue_notification(ctrl, clients).await;
1460    }
1461
1462    /// Send a subscribe message and await the ack. Returns the subscription_id.
1463    async fn send_subscribe_message_with_ack(
1464        &self,
1465        mut msg: DataPlaneMessage,
1466    ) -> Result<u64, String> {
1467        let (ack_id, ack_rx) = self.inner.subscription_manager.register_ack();
1468        msg.set_subscription_id(ack_id);
1469
1470        if let Err(e) = self.send_control_message(msg).await {
1471            self.inner.subscription_manager.cancel_ack(ack_id);
1472            return Err(format!("datapath send error: {}", e.chain()));
1473        }
1474
1475        match tokio::time::timeout(SUBSCRIPTION_ACK_TIMEOUT, ack_rx).await {
1476            Ok(Ok(Ok(()))) => Ok(ack_id),
1477            Ok(Ok(Err(err))) => Err(err.to_string()),
1478            Ok(Err(_)) => Err("subscription ack channel closed".to_string()),
1479            Err(_) => {
1480                self.inner.subscription_manager.cancel_ack(ack_id);
1481                Err("subscription ack timed out".to_string())
1482            }
1483        }
1484    }
1485
1486    /// Send an unsubscribe message with a given subscription_id and await the ack.
1487    async fn send_unsubscribe_message_with_ack(
1488        &self,
1489        mut msg: DataPlaneMessage,
1490        subscription_id: u64,
1491    ) -> Result<(), String> {
1492        let ack_rx = self
1493            .inner
1494            .subscription_manager
1495            .register_ack_with_id(subscription_id);
1496        msg.set_subscription_id(subscription_id);
1497
1498        if let Err(e) = self.send_control_message(msg).await {
1499            self.inner.subscription_manager.cancel_ack(subscription_id);
1500            return Err(format!("datapath send error: {}", e.chain()));
1501        }
1502
1503        match tokio::time::timeout(SUBSCRIPTION_ACK_TIMEOUT, ack_rx).await {
1504            Ok(Ok(Ok(()))) => Ok(()),
1505            Ok(Ok(Err(err))) => Err(err.to_string()),
1506            Ok(Err(_)) => Err("subscription ack channel closed".to_string()),
1507            Err(_) => {
1508                self.inner.subscription_manager.cancel_ack(subscription_id);
1509                Err("subscription ack timed out".to_string())
1510            }
1511        }
1512    }
1513
1514    /// Send a control message to SLIM.
1515    async fn send_control_message(&self, msg: DataPlaneMessage) -> Result<(), ControllerError> {
1516        self.inner.tx_slim.send(Ok(msg)).await.map_err(|e| {
1517            error!(error = %e.chain(), "error sending message into datapath");
1518            ControllerError::Datapath(slim_datapath::errors::DataPathError::ConnectionError)
1519        })
1520    }
1521
1522    /// Send notification to control plane or queue it if no connection is available.
1523    ///
1524    /// Uses `try_send` to avoid blocking the caller when the bounded channel is
1525    /// full (e.g. due to HTTP/2 back-pressure).  Blocking here would stall the
1526    /// data-plane listener task, preventing it from resolving subscription acks
1527    /// that the control-message handler is waiting on — a deadlock.
1528    async fn send_or_queue_notification(&self, ctrl_msg: ControlMessage, clients: &[ClientConfig]) {
1529        let mut sent = false;
1530
1531        for c in clients {
1532            let tx = match self.inner.tx_channels.read().get(&c.endpoint) {
1533                Some(tx) => tx.clone(),
1534                None => continue,
1535            };
1536
1537            match tx.try_send(Ok(ctrl_msg.clone())) {
1538                Ok(()) => {
1539                    sent = true;
1540                }
1541                Err(mpsc::error::TrySendError::Full(_)) => {
1542                    debug!(
1543                        endpoint = %c.endpoint,
1544                        "channel full, queuing notification instead of blocking"
1545                    );
1546                }
1547                Err(mpsc::error::TrySendError::Closed(_)) => {
1548                    debug!(
1549                        endpoint = %c.endpoint,
1550                        "channel closed, queuing notification"
1551                    );
1552                }
1553            }
1554        }
1555
1556        if !sent {
1557            let mut queue = self.inner.pending_notifications.lock();
1558            if queue.len() >= MAX_QUEUED_NOTIFICATIONS {
1559                queue.pop_front();
1560                debug!("queue full, removed oldest notification");
1561            }
1562            queue.push_back(ctrl_msg);
1563        }
1564    }
1565
1566    /// Get a drain watch clone to pass to a task
1567    fn drain_watch(&self) -> Result<drain::Watch, ControllerError> {
1568        self.inner
1569            .drain_watch
1570            .read()
1571            .clone()
1572            .ok_or(ControllerError::AlreadyStopped)
1573    }
1574
1575    /// Send all queued subscription notifications when connection is restored.
1576    async fn send_queued_notifications(
1577        &self,
1578        tx: &mpsc::Sender<Result<ControlMessage, Status>>,
1579        endpoint: &str,
1580    ) {
1581        let notifications = {
1582            let mut queue = self.inner.pending_notifications.lock();
1583            if queue.is_empty() {
1584                return;
1585            }
1586            queue.drain(..).collect::<Vec<_>>()
1587        };
1588
1589        if notifications.is_empty() {
1590            return;
1591        }
1592
1593        debug!(
1594            "sending {} queued subscription notifications to {}",
1595            notifications.len(),
1596            endpoint
1597        );
1598
1599        let mut failed_notifications = Vec::new();
1600        for notification in notifications {
1601            if let Err(e) = tx.send(Ok(notification)).await {
1602                error!(
1603                    error = %e.chain(),
1604                    %endpoint,
1605                    "failed to send queued notification to control plane",
1606                );
1607
1608                // we can unwrap here because we know we sent a Ok(ControlMessage)
1609                failed_notifications.push(e.0.unwrap());
1610            }
1611        }
1612
1613        // Re-queue any failed notifications
1614        if !failed_notifications.is_empty() {
1615            self.inner
1616                .pending_notifications
1617                .lock()
1618                .extend(failed_notifications);
1619        }
1620    }
1621
1622    /// Process the control message stream.
1623    fn process_control_message_stream(
1624        &self,
1625        config: Option<ClientConfig>,
1626        mut stream: impl Stream<Item = Result<ControlMessage, Status>> + Unpin + Send + 'static,
1627        tx: mpsc::Sender<Result<ControlMessage, Status>>,
1628        cancellation_token: CancellationToken,
1629    ) -> Result<JoinHandle<()>, ControllerError> {
1630        let this = self.clone();
1631        let watch = self.drain_watch()?;
1632
1633        let handle = tokio::spawn(async move {
1634            // Send a register message to the control plane
1635            let endpoint = config
1636                .as_ref()
1637                .map(|c| c.endpoint.clone())
1638                .unwrap_or_else(|| "unknown".to_string());
1639            info!(%endpoint, "connected to control plane");
1640
1641            let mut retry_connect = false;
1642
1643            let mut active_connections = Vec::new();
1644            this.inner
1645                .message_processor
1646                .connection_table()
1647                .for_each(|id, conn| {
1648                    active_connections.push(v1::ConnectionEntry {
1649                        id,
1650                        connection_type: v1::ConnectionType::Remote as i32,
1651                        config_data: match conn.config_data() {
1652                            Some(data) => {
1653                                serde_json::to_string(data).unwrap_or_else(|_| "{}".to_string())
1654                            }
1655                            None => "{}".to_string(),
1656                        },
1657                        link_id: conn.link_id(),
1658                        direction: if conn.is_outgoing() {
1659                            v1::ConnectionDirection::Outgoing as i32
1660                        } else {
1661                            v1::ConnectionDirection::Incoming as i32
1662                        },
1663                        peer_node_id: conn.peer_node_id().map(str::to_string),
1664                    });
1665                });
1666
1667            let active_routes = {
1668                let conn_id_to_link_id: HashMap<u64, String> = this
1669                    .inner
1670                    .link_id_to_conn_id
1671                    .read()
1672                    .iter()
1673                    .map(|(lid, cid)| (*cid, lid.clone()))
1674                    .collect();
1675                this.inner
1676                    .route_subscription_ids
1677                    .lock()
1678                    .iter()
1679                    .map(|((name, conn_id), _sub_id)| v1::Route {
1680                        name: Some(name.clone()),
1681                        link_id: conn_id_to_link_id.get(conn_id).cloned(),
1682                        direction: None,
1683                    })
1684                    .collect::<Vec<_>>()
1685            };
1686
1687            let register_request = ControlMessage {
1688                message_id: uuid::Uuid::new_v4().to_string(),
1689                payload: Some(Payload::RegisterNodeRequest(v1::RegisterNodeRequest {
1690                    node_id: this.inner.id.clone(),
1691                    group_name: this.inner.group_name.clone(),
1692                    connection_details: this.inner.connection_details.clone(),
1693                    connections: active_connections,
1694                    routes: active_routes,
1695                })),
1696            };
1697
1698            // send register request if client
1699            if config.is_some()
1700                && let Err(e) = tx.send(Ok(register_request)).await
1701            {
1702                error!(error = %e.chain(), "failed to send register request");
1703                return;
1704            }
1705
1706            // Send queued notifications after the register request so that
1707            // RegisterNodeRequest is always the first message on the stream.
1708            this.send_queued_notifications(&tx, &endpoint).await;
1709
1710            // Wait for RegisterNodeResponse from control plane before processing commands.
1711            if config.is_some() {
1712                let registration_result = tokio::time::timeout(
1713                    Duration::from_secs(10),
1714                    async {
1715                        loop {
1716                            tokio::select! {
1717                                next = stream.next() => {
1718                                    match next {
1719                                        Some(Ok(msg)) => {
1720                                            if let Some(Payload::RegisterNodeResponse(resp)) = msg.payload {
1721                                                if resp.success {
1722                                                    info!(
1723                                                        connections = resp.connections.len(),
1724                                                        routes = resp.routes.len(),
1725                                                        "registration acknowledged by control plane"
1726                                                    );
1727                                                    return Ok(resp);
1728                                                } else {
1729                                                    return Err("registration rejected by control plane".to_string());
1730                                                }
1731                                            }
1732                                        }
1733                                        Some(Err(_)) => return Err("stream error waiting for registration response".to_string()),
1734                                        None => return Err("stream closed before registration response".to_string()),
1735                                    }
1736                                }
1737                                _ = cancellation_token.cancelled() => {
1738                                    return Err("cancelled while waiting for registration response".to_string());
1739                                }
1740                            }
1741                        }
1742                    }
1743                ).await;
1744
1745                match registration_result {
1746                    Ok(Ok(resp)) => {
1747                        // Apply initial desired state from the control plane.
1748                        if !resp.connections.is_empty() || !resp.routes.is_empty() {
1749                            let init_cmd = ControlMessage {
1750                                message_id: uuid::Uuid::new_v4().to_string(),
1751                                payload: Some(Payload::ConfigCommand(v1::ConfigurationCommand {
1752                                    connections_to_create: resp.connections,
1753                                    routes_to_set: resp.routes,
1754                                    routes_to_delete: vec![],
1755                                    connections_to_delete: vec![],
1756                                    reconcile: true,
1757                                })),
1758                            };
1759                            if let Err(e) = this.handle_new_control_message(init_cmd, &tx).await {
1760                                error!(error = %e.chain(), "failed to apply initial state from registration");
1761                            }
1762                        }
1763                    }
1764                    Ok(Err(e)) => {
1765                        error!(%e, "registration handshake failed");
1766                        return;
1767                    }
1768                    Err(_) => {
1769                        error!("registration handshake timed out");
1770                        return;
1771                    }
1772                }
1773            }
1774
1775            let mut drain_fut = std::pin::pin!(watch.clone().signaled());
1776
1777            loop {
1778                tokio::select! {
1779                    next = stream.next() => {
1780                        match next {
1781                            Some(Ok(msg)) => {
1782                                if let Err(e) = this.handle_new_control_message(msg, &tx).await {
1783                                    error!(error = %e.chain(), "error processing incoming control message");
1784                                }
1785                            }
1786                            Some(Err(e)) => {
1787                                if let Some(io_err) = Self::match_for_io_error(&e) {
1788                                    if io_err.kind() == std::io::ErrorKind::BrokenPipe {
1789                                        info!("connection closed by peer");
1790                                    } else {
1791                                        // Handle other IO errors (ConnectionAborted, etc.)
1792                                        error!(
1793                                            error = %e.chain(),
1794                                            io_error_kind = ?io_err.kind(),
1795                                            "IO error receiving control messages"
1796                                        );
1797                                    }
1798                                } else {
1799                                    // Handle non-IO errors (e.g., gRPC Canceled, Unavailable, etc.)
1800                                    error!(error = %e.chain(), "error receiving control messages");
1801                                }
1802
1803                                retry_connect = true;
1804                                break;
1805                            }
1806                            None => {
1807                                debug!("end of stream");
1808                                retry_connect = true;
1809                                break;
1810                            }
1811                        }
1812                    }
1813                    _ = cancellation_token.cancelled() => {
1814                        debug!("shutting down stream on cancellation token");
1815                        break;
1816                    }
1817                    _ = &mut drain_fut => {
1818                        debug!("shutting down stream on drain");
1819                        break;
1820                    }
1821                }
1822            }
1823
1824            info!(%endpoint, "control plane stream closed");
1825
1826            if retry_connect && let Some(config) = config {
1827                info!(%config.endpoint, "retrying connection to control plane");
1828                this.connect(config.clone(), cancellation_token)
1829                    .await
1830                    .map_or_else(
1831                        |e| {
1832                            error!(error = %e.chain(), "failed to reconnect to control plane");
1833                        },
1834                        |tx| {
1835                            info!(%config.endpoint, "reconnected to control plane");
1836
1837                            this.inner
1838                                .tx_channels
1839                                .write()
1840                                .insert(config.endpoint.clone(), tx);
1841                        },
1842                    )
1843            }
1844        });
1845
1846        Ok(handle)
1847    }
1848
1849    /// Connect to the control plane using the provided client configuration.
1850    /// This function attempts to establish a connection to the control plane and returns a sender for control messages.
1851    /// It retries the connection a specified number of times if it fails.
1852    async fn connect(
1853        &self,
1854        config: ClientConfig,
1855        cancellation_token: CancellationToken,
1856    ) -> Result<mpsc::Sender<Result<ControlMessage, Status>>, ControllerError> {
1857        info!(%config.endpoint, "connecting to control plane");
1858
1859        // Reset connection state before establishing a new session. The CP
1860        // will send fresh ConfigCommands after re-registration to rebuild
1861        // these maps.
1862        self.inner.route_subscription_ids.lock().clear();
1863        self.inner.link_id_to_conn_id.write().clear();
1864
1865        // Obtain drain watch to handle graceful shutdown during connection
1866        let watch = self.drain_watch()?;
1867
1868        let connect_fut = async {
1869            let channel = match config.to_channel().await? {
1870                TransportChannel::Grpc(c) => c,
1871                TransportChannel::Websocket(_) => {
1872                    return Err(ControllerError::ConfigError(
1873                        slim_config::errors::ConfigError::GrpcChannelUnsupportedTransport,
1874                    ));
1875                }
1876            };
1877
1878            let mut client = ControllerServiceClient::new(channel.clone());
1879            let (tx, rx) = mpsc::channel::<Result<ControlMessage, Status>>(128);
1880            let out_stream = ReceiverStream::new(rx).filter_map(|res| match res {
1881                Ok(msg) => Some(msg),
1882                Err(e) => {
1883                    error!(error = %e, "dropping outbound control message due to error");
1884                    None
1885                }
1886            });
1887            let stream = client
1888                .open_control_channel(Request::new(out_stream))
1889                .await?;
1890            Ok((tx, stream))
1891        };
1892
1893        let (tx, stream) = tokio::select! {
1894            result = connect_fut => { result? }
1895            _ = cancellation_token.cancelled() => {
1896                debug!("connection cancelled during setup");
1897                return Err(ControllerError::Canceled);
1898            }
1899            _ = watch.signaled() => {
1900                debug!("drain signal received during connection setup");
1901                return Err(ControllerError::Canceled);
1902            }
1903        };
1904
1905        // start processing the incoming stream
1906        let endpoint_key = config.endpoint.clone();
1907        let handle = self.process_control_message_stream(
1908            Some(config),
1909            stream.into_inner(),
1910            tx.clone(),
1911            cancellation_token.clone(),
1912        )?;
1913        self.inner
1914            .stream_handles
1915            .lock()
1916            .insert(endpoint_key, handle);
1917
1918        // return the sender
1919        Ok(tx)
1920    }
1921
1922    fn match_for_io_error(err_status: &Status) -> Option<&std::io::Error> {
1923        let mut err: &(dyn std::error::Error + 'static) = err_status;
1924
1925        loop {
1926            if let Some(io_err) = err.downcast_ref::<std::io::Error>() {
1927                return Some(io_err);
1928            }
1929
1930            // h2::Error do not expose std::io::Error with `source()`
1931            // https://github.com/hyperium/h2/pull/462
1932            if let Some(h2_err) = err.downcast_ref::<h2::Error>()
1933                && let Some(io_err) = h2_err.get_io()
1934            {
1935                return Some(io_err);
1936            }
1937
1938            err = err.source()?;
1939        }
1940    }
1941}
1942
1943#[tonic::async_trait]
1944impl GrpcControllerService for ControllerService {
1945    type OpenControlChannelStream =
1946        Pin<Box<dyn Stream<Item = Result<ControlMessage, Status>> + Send + 'static>>;
1947
1948    async fn open_control_channel(
1949        &self,
1950        request: Request<tonic::Streaming<ControlMessage>>,
1951    ) -> Result<Response<Self::OpenControlChannelStream>, Status> {
1952        // Get the remote endpoint from the request metadata
1953        let remote_endpoint = request
1954            .remote_addr()
1955            .map(|addr| addr.to_string())
1956            .unwrap_or_else(|| "unknown".to_string());
1957
1958        let stream = request.into_inner();
1959        let (tx, rx) = mpsc::channel::<Result<ControlMessage, Status>>(128);
1960
1961        let cancellation_token = CancellationToken::new();
1962
1963        // Server-side connections don't initiate operations requiring acks, so no timer channel needed
1964        let handle = self
1965            .process_control_message_stream(None, stream, tx.clone(), cancellation_token.clone())
1966            .map_err(|e| {
1967                error!(error = %e.chain(), "error processing control message stream");
1968                Status::unavailable("failed to process control message stream")
1969            })?;
1970        self.inner
1971            .stream_handles
1972            .lock()
1973            .insert(remote_endpoint.clone(), handle);
1974
1975        self.inner
1976            .tx_channels
1977            .write()
1978            .insert(remote_endpoint.clone(), tx);
1979
1980        if let Some(old_token) = self
1981            .inner
1982            .cancellation_tokens
1983            .write()
1984            .insert(remote_endpoint.clone(), cancellation_token)
1985        {
1986            old_token.cancel();
1987        }
1988
1989        let out_stream = ReceiverStream::new(rx);
1990        Ok(Response::new(
1991            Box::pin(out_stream) as Self::OpenControlChannelStream
1992        ))
1993    }
1994}
1995
1996#[cfg(test)]
1997mod tests {
1998    use super::*;
1999    use tracing_test::traced_test;
2000
2001    async fn setup_control_planes(
2002        server_endpoint: &str,
2003        server_name: &str,
2004        client_name: &str,
2005    ) -> (ControlPlane, ControlPlane, ClientConfig) {
2006        let server_config = ServerConfig::with_endpoint(server_endpoint)
2007            .with_tls_settings(slim_config::tls::server::TlsServerConfig::insecure());
2008        let client_config = ClientConfig::with_endpoint(&format!("http://{}", server_endpoint))
2009            .with_tls_setting(slim_config::tls::client::TlsClientConfig::insecure());
2010
2011        let message_processor_server = MessageProcessor::new();
2012        let message_processor_client = MessageProcessor::new();
2013
2014        let control_plane_server = ControlPlane::new(ControlPlaneSettings {
2015            id: server_name.to_string(),
2016            group_name: None,
2017            servers: vec![server_config.clone()],
2018            clients: vec![],
2019            message_processor: message_processor_server,
2020            connection_details: vec![from_server_config(&server_config)],
2021        });
2022
2023        let control_plane_client = ControlPlane::new(ControlPlaneSettings {
2024            id: client_name.to_string(),
2025            group_name: None,
2026            servers: vec![],
2027            clients: vec![client_config.clone()],
2028            message_processor: message_processor_client,
2029            connection_details: vec![],
2030        });
2031
2032        (control_plane_server, control_plane_client, client_config)
2033    }
2034
2035    #[tokio::test]
2036    #[traced_test]
2037    async fn test_end_to_end() {
2038        let (mut control_plane_server, mut control_plane_client, _client_cfg) =
2039            setup_control_planes(
2040                "127.0.0.1:50051",
2041                "test-server-instance",
2042                "test-client-instance",
2043            )
2044            .await;
2045
2046        control_plane_server.run().await.unwrap();
2047        tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
2048        control_plane_client.run().await.unwrap();
2049        tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
2050
2051        assert!(logs_contain("received a register node request"));
2052    }
2053
2054    #[tokio::test]
2055    #[traced_test]
2056    async fn test_subscription_notification_queue_drain() {
2057        // Reuse common setup with a different port to avoid clash with other tests.
2058        let (mut control_plane_server, mut control_plane_client, client_config) =
2059            setup_control_planes(
2060                "127.0.0.1:50061",
2061                "queue-drain-server",
2062                "queue-drain-client",
2063            )
2064            .await;
2065
2066        let controller = control_plane_client.controller.clone();
2067        assert_eq!(controller.inner.pending_notifications.lock().len(), 0);
2068
2069        const N: usize = 5;
2070        for i in 0..N {
2071            let ctrl_msg = ControlMessage {
2072                message_id: uuid::Uuid::new_v4().to_string(),
2073                payload: Some(Payload::ConfigCommand(v1::ConfigurationCommand {
2074                    connections_to_create: vec![],
2075                    connections_to_delete: vec![],
2076                    routes_to_set: vec![v1::Route {
2077                        name: Some(
2078                            ProtoName::from_strings(["queued", "sub", &format!("name-{i}")])
2079                                .with_id(i as u128),
2080                        ),
2081                        link_id: None,
2082                        direction: None,
2083                    }],
2084                    routes_to_delete: vec![],
2085                    reconcile: false,
2086                })),
2087            };
2088            controller
2089                .send_or_queue_notification(ctrl_msg, std::slice::from_ref(&client_config))
2090                .await;
2091        }
2092        assert_eq!(controller.inner.pending_notifications.lock().len(), N);
2093
2094        control_plane_server.run().await.expect("server run failed");
2095        tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
2096        control_plane_client.run().await.expect("client run failed");
2097        tokio::time::sleep(tokio::time::Duration::from_millis(200)).await;
2098
2099        assert_eq!(controller.inner.pending_notifications.lock().len(), 0);
2100        assert!(
2101            logs_contain(&format!("sending {} queued subscription notifications", N)),
2102            "Expected log about sending queued subscription notifications"
2103        );
2104
2105        drop(controller);
2106        drop(control_plane_server);
2107        drop(control_plane_client);
2108    }
2109
2110    #[tokio::test]
2111    #[traced_test]
2112    async fn test_delete_connection_by_link_id_success_ack() {
2113        let (mut control_plane_server, mut control_plane_client, _client_cfg) =
2114            setup_control_planes(
2115                "127.0.0.1:50081",
2116                "delete-linkid-server",
2117                "delete-linkid-client",
2118            )
2119            .await;
2120
2121        control_plane_server.run().await.unwrap();
2122        tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
2123        control_plane_client.run().await.unwrap();
2124        tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
2125
2126        let controller = control_plane_client.controller.clone();
2127        let link_id = "test-delete-link-id".to_string();
2128
2129        // Insert a pre-negotiated remote connection with a known link_id for testing.
2130        let (tx, _rx) = tokio::sync::mpsc::channel(16);
2131        let conn = slim_datapath::connection::Connection::new(
2132            slim_datapath::tables::ConnType::Remote,
2133            slim_datapath::connection::Channel::Server(tx),
2134        )
2135        .with_negotiation(&link_id, "1.0.0");
2136        controller
2137            .inner
2138            .message_processor
2139            .forwarder()
2140            .on_connection_established(conn, None)
2141            .unwrap();
2142
2143        let ctrl_msg = ControlMessage {
2144            message_id: uuid::Uuid::new_v4().to_string(),
2145            payload: Some(Payload::ConfigCommand(v1::ConfigurationCommand {
2146                connections_to_create: vec![],
2147                connections_to_delete: vec![link_id.clone()],
2148                routes_to_set: vec![],
2149                routes_to_delete: vec![],
2150                reconcile: false,
2151            })),
2152        };
2153        let (tx, mut rx) = mpsc::channel(1);
2154        controller
2155            .handle_new_control_message(ctrl_msg, &tx)
2156            .await
2157            .expect("config command must be handled");
2158
2159        let ack_msg = rx
2160            .recv()
2161            .await
2162            .expect("expected ack message")
2163            .expect("ack should be ok");
2164        let ack = match ack_msg.payload {
2165            Some(Payload::ConfigCommandAck(ack)) => ack,
2166            _ => panic!("expected ConfigCommandAck payload"),
2167        };
2168        assert_eq!(ack.connections_status.len(), 1);
2169        assert_eq!(ack.connections_status[0].link_id, link_id);
2170        assert!(ack.connections_status[0].success);
2171    }
2172
2173    #[tokio::test]
2174    #[traced_test]
2175    async fn test_delete_connection_by_link_id_unknown_fails_ack() {
2176        let (control_plane_server, control_plane_client, _client_cfg) = setup_control_planes(
2177            "127.0.0.1:50082",
2178            "delete-linkid-server-unknown",
2179            "delete-linkid-client-unknown",
2180        )
2181        .await;
2182
2183        let controller = control_plane_client.controller.clone();
2184        let ctrl_msg = ControlMessage {
2185            message_id: uuid::Uuid::new_v4().to_string(),
2186            payload: Some(Payload::ConfigCommand(v1::ConfigurationCommand {
2187                connections_to_create: vec![],
2188                connections_to_delete: vec!["unknown-link-id".to_string()],
2189                routes_to_set: vec![],
2190                routes_to_delete: vec![],
2191                reconcile: false,
2192            })),
2193        };
2194        let (tx, mut rx) = mpsc::channel(1);
2195        controller
2196            .handle_new_control_message(ctrl_msg, &tx)
2197            .await
2198            .expect("config command must be handled");
2199
2200        let ack_msg = rx
2201            .recv()
2202            .await
2203            .expect("expected ack message")
2204            .expect("ack should be ok");
2205        let ack = match ack_msg.payload {
2206            Some(Payload::ConfigCommandAck(ack)) => ack,
2207            _ => panic!("expected ConfigCommandAck payload"),
2208        };
2209        assert_eq!(ack.connections_status.len(), 1);
2210        assert_eq!(ack.connections_status[0].link_id, "unknown-link-id");
2211        assert!(!ack.connections_status[0].success);
2212        assert!(ack.connections_status[0].error_msg.contains("not found"));
2213
2214        drop(control_plane_server);
2215    }
2216
2217    #[tokio::test]
2218    #[traced_test]
2219    async fn test_create_connection_with_existing_link_id_reuses_connection_ack() {
2220        let (mut control_plane_server, mut control_plane_client, _client_cfg) =
2221            setup_control_planes(
2222                "127.0.0.1:50083",
2223                "create-linkid-server",
2224                "create-linkid-client",
2225            )
2226            .await;
2227
2228        control_plane_server.run().await.unwrap();
2229        tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
2230        control_plane_client.run().await.unwrap();
2231        tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
2232
2233        let controller = control_plane_client.controller.clone();
2234        let link_id = "test-create-link-id".to_string();
2235
2236        // Insert a pre-negotiated remote connection with a known link_id for testing.
2237        let (tx, _rx) = tokio::sync::mpsc::channel(16);
2238        let conn = slim_datapath::connection::Connection::new(
2239            slim_datapath::tables::ConnType::Remote,
2240            slim_datapath::connection::Channel::Server(tx),
2241        )
2242        .with_negotiation(&link_id, "1.0.0");
2243        controller
2244            .inner
2245            .message_processor
2246            .forwarder()
2247            .on_connection_established(conn, None)
2248            .unwrap();
2249
2250        let endpoint = "http://127.0.0.1:59999";
2251        let connection_config = serde_json::json!({
2252            "endpoint": endpoint,
2253            "link_id": link_id
2254        })
2255        .to_string();
2256
2257        let ctrl_msg = ControlMessage {
2258            message_id: uuid::Uuid::new_v4().to_string(),
2259            payload: Some(Payload::ConfigCommand(v1::ConfigurationCommand {
2260                connections_to_create: vec![v1::Connection {
2261                    link_id: "reuse-existing-link".to_string(),
2262                    config_data: connection_config,
2263                }],
2264                connections_to_delete: vec![],
2265                routes_to_set: vec![],
2266                routes_to_delete: vec![],
2267                reconcile: false,
2268            })),
2269        };
2270
2271        let (tx, mut rx) = mpsc::channel(1);
2272        controller
2273            .handle_new_control_message(ctrl_msg, &tx)
2274            .await
2275            .expect("config command must be handled");
2276
2277        let ack_msg = rx
2278            .recv()
2279            .await
2280            .expect("expected ack message")
2281            .expect("ack should be ok");
2282        let ack = match ack_msg.payload {
2283            Some(Payload::ConfigCommandAck(ack)) => ack,
2284            _ => panic!("expected ConfigCommandAck payload"),
2285        };
2286        assert_eq!(ack.connections_status.len(), 1);
2287        assert_eq!(ack.connections_status[0].link_id, "reuse-existing-link");
2288        assert!(ack.connections_status[0].success);
2289
2290        assert!(
2291            controller
2292                .inner
2293                .link_id_to_conn_id
2294                .read()
2295                .contains_key(&link_id),
2296            "expected link_id to be mapped to reused connection id"
2297        );
2298    }
2299
2300    #[tokio::test]
2301    #[traced_test]
2302    async fn test_subscription_set_unknown_link_id_fails_ack() {
2303        let (control_plane_server, control_plane_client, _client_cfg) = setup_control_planes(
2304            "127.0.0.1:50084",
2305            "sub-linkid-server-unknown",
2306            "sub-linkid-client-unknown",
2307        )
2308        .await;
2309
2310        let controller = control_plane_client.controller.clone();
2311        let ctrl_msg = ControlMessage {
2312            message_id: uuid::Uuid::new_v4().to_string(),
2313            payload: Some(Payload::ConfigCommand(v1::ConfigurationCommand {
2314                connections_to_create: vec![],
2315                connections_to_delete: vec![],
2316                routes_to_set: vec![v1::Route {
2317                    name: Some(ProtoName::from_strings(["org", "ns", "agent"]).with_id(1u128)),
2318                    link_id: Some("missing-link-id".to_string()),
2319                    direction: None,
2320                }],
2321                routes_to_delete: vec![],
2322                reconcile: false,
2323            })),
2324        };
2325        let (tx, mut rx) = mpsc::channel(1);
2326        controller
2327            .handle_new_control_message(ctrl_msg, &tx)
2328            .await
2329            .expect("config command must be handled");
2330
2331        let ack_msg = rx
2332            .recv()
2333            .await
2334            .expect("expected ack message")
2335            .expect("ack should be ok");
2336        let ack = match ack_msg.payload {
2337            Some(Payload::ConfigCommandAck(ack)) => ack,
2338            _ => panic!("expected ConfigCommandAck payload"),
2339        };
2340
2341        assert_eq!(ack.routes_status.len(), 1);
2342        assert!(!ack.routes_status[0].success);
2343        assert!(
2344            ack.routes_status[0]
2345                .error_msg
2346                .contains("Connection with link_id missing-link-id not found")
2347        );
2348
2349        drop(control_plane_server);
2350    }
2351
2352    #[tokio::test]
2353    #[traced_test]
2354    async fn test_create_connection_invalid_config_fails_ack() {
2355        let (_control_plane_server, control_plane_client, _client_cfg) = setup_control_planes(
2356            "127.0.0.1:50085",
2357            "create-invalid-config-server",
2358            "create-invalid-config-client",
2359        )
2360        .await;
2361
2362        let controller = control_plane_client.controller.clone();
2363        let ctrl_msg = ControlMessage {
2364            message_id: uuid::Uuid::new_v4().to_string(),
2365            payload: Some(Payload::ConfigCommand(v1::ConfigurationCommand {
2366                connections_to_create: vec![v1::Connection {
2367                    link_id: "invalid-config-conn".to_string(),
2368                    config_data: "{invalid-json".to_string(),
2369                }],
2370                connections_to_delete: vec![],
2371                routes_to_set: vec![],
2372                routes_to_delete: vec![],
2373                reconcile: false,
2374            })),
2375        };
2376        let (tx, mut rx) = mpsc::channel(1);
2377        controller
2378            .handle_new_control_message(ctrl_msg, &tx)
2379            .await
2380            .expect("config command must be handled");
2381
2382        let ack_msg = rx
2383            .recv()
2384            .await
2385            .expect("expected ack message")
2386            .expect("ack should be ok");
2387        let ack = match ack_msg.payload {
2388            Some(Payload::ConfigCommandAck(ack)) => ack,
2389            _ => panic!("expected ConfigCommandAck payload"),
2390        };
2391        assert_eq!(ack.connections_status.len(), 1);
2392        assert!(!ack.connections_status[0].success);
2393        assert!(
2394            ack.connections_status[0]
2395                .error_msg
2396                .contains("Failed to parse config")
2397        );
2398    }
2399
2400    #[tokio::test]
2401    #[traced_test]
2402    async fn test_subscription_delete_unknown_link_id_fails_ack() {
2403        let (control_plane_server, control_plane_client, _client_cfg) = setup_control_planes(
2404            "127.0.0.1:50086",
2405            "sub-del-linkid-server-unknown",
2406            "sub-del-linkid-client-unknown",
2407        )
2408        .await;
2409
2410        let controller = control_plane_client.controller.clone();
2411        let ctrl_msg = ControlMessage {
2412            message_id: uuid::Uuid::new_v4().to_string(),
2413            payload: Some(Payload::ConfigCommand(v1::ConfigurationCommand {
2414                connections_to_create: vec![],
2415                connections_to_delete: vec![],
2416                routes_to_set: vec![],
2417                routes_to_delete: vec![v1::Route {
2418                    name: Some(ProtoName::from_strings(["org", "ns", "agent"]).with_id(1u128)),
2419                    link_id: Some("missing-link-id-delete".to_string()),
2420                    direction: None,
2421                }],
2422                reconcile: false,
2423            })),
2424        };
2425        let (tx, mut rx) = mpsc::channel(1);
2426        controller
2427            .handle_new_control_message(ctrl_msg, &tx)
2428            .await
2429            .expect("config command must be handled");
2430
2431        let ack_msg = rx
2432            .recv()
2433            .await
2434            .expect("expected ack message")
2435            .expect("ack should be ok");
2436        let ack = match ack_msg.payload {
2437            Some(Payload::ConfigCommandAck(ack)) => ack,
2438            _ => panic!("expected ConfigCommandAck payload"),
2439        };
2440
2441        assert_eq!(ack.routes_status.len(), 1);
2442        assert!(!ack.routes_status[0].success);
2443        assert!(
2444            ack.routes_status[0]
2445                .error_msg
2446                .contains("Connection with link_id missing-link-id-delete not found")
2447        );
2448
2449        drop(control_plane_server);
2450    }
2451
2452    #[tokio::test]
2453    #[traced_test]
2454    async fn test_shutdown_drains_resources() {
2455        // Use a unique port to avoid conflicts with other tests.
2456        let (mut control_plane_server, mut control_plane_client, _client_cfg) =
2457            setup_control_planes(
2458                "127.0.0.1:50071",
2459                "shutdown-server-instance",
2460                "shutdown-client-instance",
2461            )
2462            .await;
2463
2464        // Run both ends to populate cancellation tokens.
2465        control_plane_server
2466            .run()
2467            .await
2468            .expect("server should start");
2469        tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
2470        control_plane_client
2471            .run()
2472            .await
2473            .expect("client should start");
2474        tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
2475
2476        // Ensure we have at least one cancellation token (server or client side tasks).
2477        let server_tokens_before = control_plane_server
2478            .controller
2479            .inner
2480            .cancellation_tokens
2481            .read()
2482            .len();
2483        assert!(
2484            server_tokens_before > 0,
2485            "expected server to have active cancellation tokens before shutdown"
2486        );
2487
2488        let client_tokens_before = control_plane_client
2489            .controller
2490            .inner
2491            .cancellation_tokens
2492            .read()
2493            .len();
2494        assert!(
2495            client_tokens_before > 0,
2496            "expected client to have active cancellation tokens before shutdown"
2497        );
2498
2499        // Perform shutdown on both.
2500        control_plane_client
2501            .shutdown()
2502            .await
2503            .expect("client shutdown ok");
2504        control_plane_server
2505            .shutdown()
2506            .await
2507            .expect("server shutdown ok");
2508
2509        // After shutdown, all cancellation tokens should be drained.
2510        let server_tokens_after = control_plane_server
2511            .controller
2512            .inner
2513            .cancellation_tokens
2514            .read()
2515            .len();
2516        assert_eq!(
2517            server_tokens_after, 0,
2518            "expected server cancellation tokens to be drained after shutdown"
2519        );
2520
2521        let client_tokens_after = control_plane_client
2522            .controller
2523            .inner
2524            .cancellation_tokens
2525            .read()
2526            .len();
2527        assert_eq!(
2528            client_tokens_after, 0,
2529            "expected client cancellation tokens to be drained after shutdown"
2530        );
2531
2532        // Second shutdown should error because drain_signal has been taken.
2533        assert!(
2534            control_plane_server.shutdown().await.is_err(),
2535            "second shutdown on server should return an error"
2536        );
2537        assert!(
2538            control_plane_client.shutdown().await.is_err(),
2539            "second shutdown on client should return an error"
2540        );
2541    }
2542
2543    #[tokio::test]
2544    #[traced_test]
2545    async fn test_shutdown_without_run() {
2546        // Build a control plane but do NOT call run()
2547        let (control_plane_server, mut _control_plane_client, _client_cfg) = setup_control_planes(
2548            "127.0.0.1:50072",
2549            "shutdown-no-run-server",
2550            "shutdown-no-run-client",
2551        )
2552        .await;
2553
2554        // No tasks should be registered yet
2555        assert_eq!(
2556            control_plane_server
2557                .controller
2558                .inner
2559                .cancellation_tokens
2560                .read()
2561                .len(),
2562            0,
2563            "expected zero cancellation tokens before shutdown when not run"
2564        );
2565
2566        // Shutdown should still succeed gracefully.
2567        control_plane_server
2568            .shutdown()
2569            .await
2570            .expect("shutdown without prior run should succeed");
2571
2572        // Tokens remain zero.
2573        assert_eq!(
2574            control_plane_server
2575                .controller
2576                .inner
2577                .cancellation_tokens
2578                .read()
2579                .len(),
2580            0,
2581            "expected zero cancellation tokens after shutdown when not run"
2582        );
2583
2584        // Second shutdown should fail.
2585        assert!(
2586            control_plane_server.shutdown().await.is_err(),
2587            "second shutdown should error due to missing drain signal"
2588        );
2589    }
2590}