1use std::collections::{HashMap, VecDeque};
5use std::pin::Pin;
6use std::sync::Arc;
7
8use std::time::Duration;
9
10use display_error_chain::ErrorChainExt;
11use slim_config::server::ServerConfig;
12use slim_session::subscription_manager::SubscriptionManager;
13use tokio::sync::mpsc;
14use tokio::task::JoinHandle;
15use tokio_stream::{Stream, StreamExt, wrappers::ReceiverStream};
16use tokio_util::sync::CancellationToken;
17use tonic::{Request, Response, Status};
18use tracing::{debug, error, info};
19
20use crate::api::proto::api::v1::control_message::Payload;
21use crate::api::proto::api::v1::controller_service_server::ControllerServiceServer;
22use crate::api::proto::api::v1::{
23 self, ConnectionDetails, ConnectionDirection, ConnectionListResponse, ConnectionType,
24 RouteListResponse,
25};
26use crate::api::proto::api::v1::{
27 ConnectionEntry, ControlMessage, RouteEntry,
28 controller_service_client::ControllerServiceClient,
29 controller_service_server::ControllerService as GrpcControllerService,
30};
31use crate::errors::ControllerError;
32use prost_types::Struct;
33use slim_config::client::{ClientConfig, TransportChannel};
34use slim_datapath::api::{
35 MessageType::Subscribe, MessageType::SubscriptionAck as SubscriptionAckType,
36 MessageType::Unsubscribe, ProtoMessage as DataPlaneMessage,
37};
38use slim_datapath::api::{NameId, ProtoName};
39use slim_datapath::message_processing::MessageProcessor;
40use slim_datapath::messages::utils::SlimHeaderFlags;
41use slim_datapath::tables::SubscriptionTable;
42
43type TxChannel = mpsc::Sender<Result<ControlMessage, Status>>;
44type TxChannels = HashMap<String, TxChannel>;
45
46const MAX_QUEUED_NOTIFICATIONS: usize = 1000;
48
49const SUBSCRIPTION_ACK_TIMEOUT: Duration = Duration::from_secs(30);
51
52#[derive(Clone)]
54pub struct ControlPlaneSettings {
55 pub id: String,
57 pub group_name: Option<String>,
59 pub servers: Vec<ServerConfig>,
61 pub clients: Vec<ClientConfig>,
63 pub message_processor: MessageProcessor,
65 pub connection_details: Vec<ConnectionDetails>,
68}
69
70struct ControllerServiceInternal {
75 id: String,
77
78 group_name: Option<String>,
80
81 message_processor: MessageProcessor,
83
84 tx_slim: mpsc::Sender<Result<DataPlaneMessage, Status>>,
86
87 tx_channels: parking_lot::RwLock<TxChannels>,
89
90 cancellation_tokens: parking_lot::RwLock<HashMap<String, CancellationToken>>,
92
93 drain_watch: parking_lot::RwLock<Option<drain::Watch>>,
95
96 pending_notifications: Arc<parking_lot::Mutex<VecDeque<ControlMessage>>>,
98
99 subscription_manager: SubscriptionManager,
101
102 connection_details: Vec<ConnectionDetails>,
104
105 route_subscription_ids: parking_lot::Mutex<HashMap<(ProtoName, u64), u64>>,
107
108 link_id_to_conn_id: parking_lot::RwLock<HashMap<String, u64>>,
111
112 stream_handles: parking_lot::Mutex<HashMap<String, tokio::task::JoinHandle<()>>>,
114}
115
116#[derive(Clone)]
117struct ControllerService {
118 inner: Arc<ControllerServiceInternal>,
120}
121
122pub struct ControlPlane {
124 servers: Vec<ServerConfig>,
126
127 clients: Vec<ClientConfig>,
129
130 drain_signal: parking_lot::RwLock<Option<drain::Signal>>,
132
133 controller: ControllerService,
135
136 rx_slim_option: Option<mpsc::Receiver<Result<DataPlaneMessage, Status>>>,
139}
140
141impl Drop for ControlPlane {
144 fn drop(&mut self) {
145 for (_endpoint, token) in self.controller.inner.cancellation_tokens.write().drain() {
147 token.cancel();
148 }
149 }
150}
151
152pub(crate) fn from_server_config(server_config: &ServerConfig) -> ConnectionDetails {
153 let mut endpoint = server_config.endpoint.clone();
154 let mut external_endpoint = None;
155 let mut spire_socket_path = None;
156 let mut trust_domain = None;
157 let mut remaining_fields = std::collections::BTreeMap::new();
158
159 if let Some(m) = &server_config.metadata {
160 for (k, v) in &m.inner {
161 match k.as_str() {
162 "local_endpoint" => {
163 if let Some(s) = v.as_str()
164 && !s.is_empty()
165 {
166 endpoint = s.to_string();
167 }
168 }
169 "external_endpoint" => {
170 if let Some(s) = v.as_str()
171 && !s.is_empty()
172 {
173 external_endpoint = Some(s.to_string());
174 }
175 }
176 "spire_socket_path" => {
177 if let Some(s) = v.as_str()
178 && !s.is_empty()
179 {
180 spire_socket_path = Some(s.to_string());
181 }
182 }
183 "trust_domain" => {
184 if let Some(s) = v.as_str()
185 && !s.is_empty()
186 {
187 trust_domain = Some(s.to_string());
188 }
189 }
190 _ => {
191 remaining_fields.insert(k.clone(), prost_types::Value::from(v));
192 }
193 }
194 }
195 }
196
197 let spire_mtls = if !server_config.tls_setting.insecure || spire_socket_path.is_some() {
198 Some(v1::connection_details::SpireMtls {
199 socket_path: spire_socket_path.unwrap_or_default(),
200 trust_domain,
201 })
202 } else {
203 None
204 };
205
206 let metadata = if remaining_fields.is_empty() {
207 None
208 } else {
209 Some(Struct {
210 fields: remaining_fields,
211 })
212 };
213
214 ConnectionDetails {
215 endpoint,
216 external_endpoint,
217 spire_mtls,
218 metadata,
219 }
220}
221
222impl ControlPlane {
224 pub fn new(config: ControlPlaneSettings) -> Self {
237 let (_, tx_slim, rx_slim) = config
239 .message_processor
240 .register_local_connection(true)
241 .unwrap();
242
243 let (signal, watch) = drain::channel();
244
245 ControlPlane {
246 servers: config.servers,
247 clients: config.clients,
248 controller: ControllerService {
249 inner: Arc::new(ControllerServiceInternal {
250 id: config.id,
251 group_name: config.group_name,
252 message_processor: config.message_processor,
253 subscription_manager: SubscriptionManager::new(tx_slim.clone()),
254 tx_slim,
255 tx_channels: parking_lot::RwLock::new(HashMap::new()),
256 cancellation_tokens: parking_lot::RwLock::new(HashMap::new()),
257 drain_watch: parking_lot::RwLock::new(Some(watch)),
258 pending_notifications: Arc::new(parking_lot::Mutex::new(VecDeque::new())),
259 connection_details: config.connection_details,
260 route_subscription_ids: parking_lot::Mutex::new(HashMap::new()),
261 link_id_to_conn_id: parking_lot::RwLock::new(HashMap::new()),
262 stream_handles: parking_lot::Mutex::new(HashMap::new()),
263 }),
264 },
265 drain_signal: parking_lot::RwLock::new(Some(signal)),
266 rx_slim_option: Some(rx_slim),
267 }
268 }
269
270 pub fn with_clients(mut self, clients: Vec<ClientConfig>) -> Self {
272 self.clients = clients;
273 self
274 }
275
276 pub fn with_servers(mut self, servers: Vec<ServerConfig>) -> Self {
278 self.servers = servers;
279 self
280 }
281
282 pub async fn run(&mut self) -> Result<(), ControllerError> {
289 let rx = self
290 .rx_slim_option
291 .take()
292 .ok_or(ControllerError::AlreadyStarted)?;
293
294 let servers = self.servers.clone();
296 let clients = self.clients.clone();
297
298 for server in servers {
300 self.run_server(server).await?;
301 }
302
303 for client in clients {
305 self.run_client(client).await?;
306 }
307
308 self.listen_from_data_plane(rx).await?;
309
310 Ok(())
311 }
312
313 pub async fn deregister(&self) -> Result<(), ControllerError> {
314 let node_id = self.controller.inner.id.clone();
315 let deregister_msg = ControlMessage {
316 message_id: uuid::Uuid::new_v4().to_string(),
317 payload: Some(Payload::DeregisterNodeRequest(v1::DeregisterNodeRequest {
318 node: Some(v1::Node { id: node_id }),
319 })),
320 };
321 let channels: Vec<(String, TxChannel)> = self
322 .controller
323 .inner
324 .tx_channels
325 .read()
326 .iter()
327 .map(|(ep, tx)| (ep.clone(), tx.clone()))
328 .collect();
329 for (endpoint, tx) in channels {
330 if let Err(e) = tx.send(Ok(deregister_msg.clone())).await {
331 error!(%endpoint, error = %e, "failed to send deregister request");
332 }
333 }
334 Ok(())
335 }
336
337 pub async fn shutdown(&self) -> Result<(), ControllerError> {
338 let signal = self
340 .drain_signal
341 .write()
342 .take()
343 .ok_or(ControllerError::AlreadyStopped)?;
344
345 self.controller
347 .inner
348 .cancellation_tokens
349 .write()
350 .drain()
351 .for_each(|(endpoint, token)| {
352 info!(%endpoint, "stopping");
353 token.cancel();
354 });
355
356 self.controller.inner.drain_watch.write().take();
358
359 signal.drain().await;
361
362 Ok(())
363 }
364
365 async fn listen_from_data_plane(
366 &mut self,
367 mut rx: mpsc::Receiver<Result<DataPlaneMessage, Status>>,
368 ) -> Result<(), ControllerError> {
369 let cancellation_token = CancellationToken::new();
370 let cancellation_token_clone = cancellation_token.clone();
371
372 self.controller
373 .inner
374 .cancellation_tokens
375 .write()
376 .insert("DATA_PLANE".to_string(), cancellation_token_clone);
377
378 let clients = self.clients.clone();
379 let controller = self.controller.clone();
380
381 let watch = self.controller.drain_watch()?;
383
384 debug!("Starting data plane listener");
385 tokio::spawn(async move {
386 let mut drain_fut = std::pin::pin!(watch.signaled());
387 loop {
388 tokio::select! {
389 next = rx.recv() => {
390 match next {
391 Some(res) => {
392 match res {
393 Ok(msg) => {
394 debug!("Send sub/unsub to control plane for message: {:?}", msg);
395 match msg.get_type() {
396 Subscribe(_) => {
397 controller.handle_subscribe_message(msg.get_dst(), &clients).await;
398 }
399 Unsubscribe(_) => {
400 controller.handle_unsubscribe_message(msg.get_dst(), &clients).await;
401 }
402 SubscriptionAckType(_) => {
403 controller.inner.subscription_manager.resolve_ack(msg.get_subscription_ack());
404 }
405 _ => {
406 debug!("Ignoring unexpected message type from dataplane: {:?}", msg.get_type());
407 }
408 }
409 }
410 Err(e) => {
411 error!(error = %e.chain(), "received error from the data plane");
412 continue;
413 }
414 }
415 }
416 None => {
417 debug!("Data plane receiver channel closed.");
418 break;
419 }
420 }
421 }
422 _ = cancellation_token.cancelled() => {
423 debug!("shutting down stream on cancellation token");
424 break;
425 }
426 _ = &mut drain_fut => {
427 debug!("shutting down stream on drain");
428 break;
429 }
430 }
431 }
432 });
433 Ok(())
434 }
435
436 pub fn stop(&mut self) {
440 info!("stopping controller service");
441
442 for (endpoint, token) in self.controller.inner.cancellation_tokens.write().drain() {
444 info!(%endpoint, "stopping");
445 token.cancel();
446 }
447 }
448
449 async fn run_client(&mut self, client: ClientConfig) -> Result<(), ControllerError> {
453 if self
454 .controller
455 .inner
456 .cancellation_tokens
457 .read()
458 .contains_key(&client.endpoint)
459 {
460 return Err(ControllerError::ClientAlreadyRunning(client.endpoint));
461 }
462
463 let cancellation_token = CancellationToken::new();
464
465 let tx = self
466 .controller
467 .connect(client.clone(), cancellation_token.clone())
468 .await?;
469
470 self.controller
472 .inner
473 .cancellation_tokens
474 .write()
475 .insert(client.endpoint.clone(), cancellation_token);
476
477 self.controller
479 .inner
480 .tx_channels
481 .write()
482 .insert(client.endpoint.clone(), tx);
483
484 Ok(())
486 }
487
488 pub async fn run_server(&mut self, config: ServerConfig) -> Result<(), ControllerError> {
492 if self
494 .controller
495 .inner
496 .cancellation_tokens
497 .read()
498 .contains_key(&config.endpoint)
499 {
500 error!(endpoint = config.endpoint, "server is already running",);
501 return Err(ControllerError::ServerAlreadyRunning(config.endpoint));
502 }
503
504 let token = config
505 .run_grpc_server(
506 &[ControllerServiceServer::new(self.controller.clone())],
507 self.controller.drain_watch()?,
508 )
509 .await?;
510
511 self.controller
513 .inner
514 .cancellation_tokens
515 .write()
516 .insert(config.endpoint.clone(), token.clone());
517
518 info!(%config.endpoint, "started controlplane server");
519
520 Ok(())
521 }
522}
523
524impl ControllerService {
525 fn resolve_connection_by_link_id(&self, link_id: &str) -> Result<Option<u64>, String> {
526 let cached = self.inner.link_id_to_conn_id.read().get(link_id).copied();
527 if let Some(conn_id) = cached {
528 if self
529 .inner
530 .message_processor
531 .connection_table()
532 .get(conn_id)
533 .is_some_and(|conn| conn.link_id().as_deref() == Some(link_id))
534 {
535 return Ok(Some(conn_id));
536 }
537 self.inner.link_id_to_conn_id.write().remove(link_id);
538 }
539
540 let mut resolved: Option<u64> = None;
541 self.inner
542 .message_processor
543 .connection_table()
544 .for_each(|id, conn| {
545 if conn.link_id().as_deref() == Some(link_id) && resolved.is_none() {
546 resolved = Some(id);
547 }
548 });
549
550 if let Some(conn_id) = resolved {
551 self.inner
552 .link_id_to_conn_id
553 .write()
554 .insert(link_id.to_string(), conn_id);
555 }
556
557 Ok(resolved)
558 }
559
560 fn disconnect_connection_by_link_id(&self, link_id: &str) -> Result<(), String> {
561 if link_id.trim().is_empty() {
562 return Err("link_id cannot be empty".to_string());
563 }
564
565 let conn_id = match self.resolve_connection_by_link_id(link_id)? {
566 Some(id) => id,
567 None => {
568 return Err(format!("Connection with link_id {} not found", link_id));
569 }
570 };
571
572 if let Err(e) = self.inner.message_processor.disconnect(conn_id) {
573 info!(
575 link_id = %link_id,
576 conn_id,
577 error = %e,
578 "Disconnect returned an error; continuing delete flow"
579 );
580 }
581
582 self.inner.link_id_to_conn_id.write().remove(link_id);
583 self.inner
584 .route_subscription_ids
585 .lock()
586 .retain(|(_name, cid), _| *cid != conn_id);
587
588 info!(link_id = %link_id, conn_id, "Successfully deleted connection by link_id");
589 Ok(())
590 }
591
592 fn resolve_route_connection(&self, route: &v1::Route) -> Result<Option<u64>, String> {
593 if let Some(link_id) = &route.link_id {
594 let trimmed = link_id.trim();
595 if !trimmed.is_empty() {
596 return self.resolve_connection_by_link_id(trimmed);
597 }
598 }
599
600 Ok(None)
601 }
602
603 async fn diff_connections(
604 &self,
605 desired_connections: &[v1::Connection],
606 ) -> Vec<v1::ConnectionAck> {
607 let mut connections_status = Vec::new();
608
609 let desired_link_ids: std::collections::HashSet<String> = desired_connections
610 .iter()
611 .map(|c| c.link_id.clone())
612 .collect();
613
614 let mut live_outgoing_link_ids: Vec<String> = Vec::new();
615 self.inner
616 .message_processor
617 .connection_table()
618 .for_each(|_id, conn| {
619 if conn.is_outgoing()
620 && let Some(lid) = conn.link_id()
621 && !lid.is_empty()
622 {
623 live_outgoing_link_ids.push(lid);
624 }
625 });
626
627 for link_id in &live_outgoing_link_ids {
628 if desired_link_ids.contains(link_id) {
629 continue;
630 }
631 info!(link_id = %link_id, "desired state: removing connection");
632 let mut success = true;
633 let mut error_msg = String::new();
634 if let Err(err) = self.disconnect_connection_by_link_id(link_id) {
635 success = false;
636 error_msg = err;
637 }
638 connections_status.push(v1::ConnectionAck {
639 link_id: link_id.clone(),
640 success,
641 error_msg,
642 });
643 }
644
645 for conn in desired_connections {
646 let link_id = &conn.link_id;
647 if link_id.is_empty() {
648 continue;
649 }
650
651 let already_exists = match self.resolve_connection_by_link_id(link_id) {
652 Ok(Some(_)) => true,
653 Ok(None) => false,
654 Err(err) => {
655 connections_status.push(v1::ConnectionAck {
656 link_id: link_id.clone(),
657 success: false,
658 error_msg: err,
659 });
660 continue;
661 }
662 };
663
664 if already_exists {
665 connections_status.push(v1::ConnectionAck {
666 link_id: link_id.clone(),
667 success: true,
668 error_msg: String::new(),
669 });
670 continue;
671 }
672
673 info!(?conn, "desired state: creating connection");
674 let mut success = true;
675 let mut error_msg = String::new();
676
677 match serde_json::from_str::<ClientConfig>(&conn.config_data) {
678 Err(e) => {
679 success = false;
680 error_msg = format!("Failed to parse config: {}", e);
681 }
682 Ok(client_config) => {
683 match self
684 .inner
685 .message_processor
686 .connect(client_config.clone(), None, None)
687 .await
688 {
689 Err(e) => {
690 success = false;
691 error_msg = format!("Connection failed: {}", e);
692 }
693 Ok(conn_id) => {
694 let requested_link_id = if client_config.link_id.trim().is_empty() {
695 String::new()
696 } else {
697 client_config.link_id.clone()
698 };
699 if !requested_link_id.is_empty() {
700 self.inner
701 .link_id_to_conn_id
702 .write()
703 .insert(requested_link_id, conn_id.1);
704 }
705 info!(
706 link_id = %link_id,
707 "Successfully created connection"
708 );
709 }
710 }
711 }
712 }
713
714 connections_status.push(v1::ConnectionAck {
715 link_id: link_id.clone(),
716 success,
717 error_msg,
718 });
719 }
720
721 connections_status
722 }
723
724 fn resolve_desired_routes<'a>(
725 &self,
726 desired_routes: &'a [v1::Route],
727 ) -> (HashMap<(ProtoName, u64), &'a v1::Route>, Vec<v1::RouteAck>) {
728 type SubKey = (ProtoName, u64);
729 let mut desired_subs: HashMap<SubKey, &v1::Route> = HashMap::new();
730 let mut failures: Vec<v1::RouteAck> = Vec::new();
731
732 for sub in desired_routes {
733 match self.resolve_route_connection(sub) {
734 Ok(Some(conn_id)) => {
735 let name = sub.name.clone().unwrap();
736 desired_subs.insert((name, conn_id), sub);
737 }
738 Ok(None) => {
739 failures.push(v1::RouteAck {
740 route: Some(sub.clone()),
741 success: false,
742 error_msg: "connection not found".to_string(),
743 });
744 }
745 Err(err) => {
746 failures.push(v1::RouteAck {
747 route: Some(sub.clone()),
748 success: false,
749 error_msg: err,
750 });
751 }
752 }
753 }
754
755 (desired_subs, failures)
756 }
757
758 async fn delete_stale_subscriptions(
759 &self,
760 desired_subs: &HashMap<(ProtoName, u64), &v1::Route>,
761 ) -> Vec<v1::RouteAck> {
762 let active_subs: Vec<((ProtoName, u64), u64)> = self
763 .inner
764 .route_subscription_ids
765 .lock()
766 .iter()
767 .map(|((name, conn_id), sub_id)| ((name.clone(), *conn_id), *sub_id))
768 .collect();
769
770 let stale: Vec<_> = active_subs
771 .into_iter()
772 .filter(|((name, conn_id), _)| !desired_subs.contains_key(&(name.clone(), *conn_id)))
773 .collect();
774
775 let futs = stale.iter().map(|((name, conn_id), sub_id)| {
776 let name = name.clone();
777 let conn_id = *conn_id;
778 let sub_id = *sub_id;
779 async move {
780 let conn_alive = self
781 .inner
782 .message_processor
783 .connection_table()
784 .get(conn_id)
785 .is_some();
786
787 let (success, error_msg) = if conn_alive {
788 let source = name.clone().with_id(0);
789 let unsub_msg = DataPlaneMessage::builder()
790 .source(source)
791 .destination(name.clone())
792 .identity("")
793 .flags(SlimHeaderFlags::default().with_recv_from(conn_id))
794 .build_unsubscribe()
795 .unwrap();
796
797 match self
798 .send_unsubscribe_message_with_ack(unsub_msg, sub_id)
799 .await
800 {
801 Ok(()) => (true, String::new()),
802 Err(err) => (false, format!("Failed to unsubscribe: {}", err)),
803 }
804 } else {
805 (true, String::new())
806 };
807
808 (name, conn_id, success, error_msg)
809 }
810 });
811
812 let results = futures::future::join_all(futs).await;
813
814 let mut routes_status = Vec::with_capacity(results.len());
815 for (name, conn_id, success, error_msg) in results {
816 if success {
817 self.inner
818 .route_subscription_ids
819 .lock()
820 .remove(&(name.clone(), conn_id));
821 }
822
823 routes_status.push(v1::RouteAck {
824 route: Some(v1::Route {
825 name: Some(name.clone()),
826 link_id: None,
827 direction: None,
828 }),
829 success,
830 error_msg,
831 });
832 }
833
834 routes_status
835 }
836
837 async fn create_new_subscriptions(
838 &self,
839 desired_subs: &HashMap<(ProtoName, u64), &v1::Route>,
840 ) -> Vec<v1::RouteAck> {
841 let mut routes_status = Vec::new();
842
843 let to_create: Vec<((ProtoName, u64), v1::Route)> = desired_subs
844 .iter()
845 .filter(|((name, conn_id), _)| {
846 !self
847 .inner
848 .route_subscription_ids
849 .lock()
850 .contains_key(&(name.clone(), *conn_id))
851 })
852 .map(|((name, conn_id), sub)| ((name.clone(), *conn_id), (*sub).clone()))
853 .collect();
854
855 for ((name, conn_id), sub) in desired_subs {
857 let dominated = to_create
858 .iter()
859 .any(|((n, c), _)| n == name && *c == *conn_id);
860 if !dominated {
861 routes_status.push(v1::RouteAck {
862 route: Some((*sub).clone()),
863 success: true,
864 error_msg: String::new(),
865 });
866 }
867 }
868
869 let futs = to_create.iter().map(|((name, conn_id), sub)| {
870 let name = name.clone();
871 let conn_id = *conn_id;
872 let sub = sub.clone();
873 async move {
874 let source = name.clone().with_id(0);
875 let sub_msg = DataPlaneMessage::builder()
876 .source(source)
877 .destination(name.clone())
878 .identity("")
879 .flags(SlimHeaderFlags::default().with_recv_from(conn_id))
880 .build_subscribe()
881 .unwrap();
882
883 let result = self.send_subscribe_message_with_ack(sub_msg).await;
884 (name, conn_id, sub, result)
885 }
886 });
887
888 let results = futures::future::join_all(futs).await;
889
890 for (name, conn_id, sub, result) in results {
891 let (success, error_msg) = match result {
892 Ok(subscription_id) => {
893 self.inner
894 .route_subscription_ids
895 .lock()
896 .insert((name, conn_id), subscription_id);
897 info!(?sub, "desired state: created route");
898 (true, String::new())
899 }
900 Err(err) => (false, format!("Failed to subscribe: {}", err)),
901 };
902
903 routes_status.push(v1::RouteAck {
904 route: Some(sub),
905 success,
906 error_msg,
907 });
908 }
909
910 routes_status
911 }
912
913 async fn handle_new_control_message(
915 &self,
916 msg: ControlMessage,
917 tx: &mpsc::Sender<Result<ControlMessage, Status>>,
918 ) -> Result<(), ControllerError> {
919 match msg.payload {
920 Some(ref payload) => {
921 match payload {
922 Payload::ConfigCommand(config) => {
923 let (connections_status, routes_status) = if config.reconcile {
924 let connections_status =
925 self.diff_connections(&config.connections_to_create).await;
926 let (desired_subs, mut routes_status) =
927 self.resolve_desired_routes(&config.routes_to_set);
928 routes_status
929 .extend(self.delete_stale_subscriptions(&desired_subs).await);
930 routes_status
931 .extend(self.create_new_subscriptions(&desired_subs).await);
932 (connections_status, routes_status)
933 } else {
934 let mut connections_status = Vec::new();
935 let mut routes_status = Vec::new();
936
937 for link_id in &config.connections_to_delete {
939 info!(link_id = %link_id, "received a connection to delete");
940 let mut connection_success = true;
941 let mut connection_error_msg = String::new();
942
943 if let Err(err) = self.disconnect_connection_by_link_id(link_id) {
944 connection_success = false;
945 connection_error_msg = err;
946 }
947
948 connections_status.push(v1::ConnectionAck {
949 link_id: link_id.clone(),
950 success: connection_success,
951 error_msg: connection_error_msg,
952 });
953 }
954
955 for conn in &config.connections_to_create {
957 info!(?conn, "received a connection to create");
958 let mut connection_success = true;
959 let mut connection_error_msg = String::new();
960
961 match serde_json::from_str::<ClientConfig>(&conn.config_data) {
962 Err(e) => {
963 connection_success = false;
964 connection_error_msg =
965 format!("Failed to parse config: {}", e);
966 }
967 Ok(client_config) => {
968 let client_endpoint = &client_config.endpoint;
969 let requested_link_id =
970 if client_config.link_id.trim().is_empty() {
971 String::new()
972 } else {
973 client_config.link_id.clone()
974 };
975 let mut existing_conn_for_link_id = false;
976
977 if !requested_link_id.is_empty() {
978 match self
979 .resolve_connection_by_link_id(&requested_link_id)
980 {
981 Err(err) => {
982 connection_success = false;
983 connection_error_msg = err;
984 }
985 Ok(Some(conn_id)) => {
986 existing_conn_for_link_id = true;
987 self.inner
988 .link_id_to_conn_id
989 .write()
990 .insert(requested_link_id.clone(), conn_id);
991 info!(
992 link_id = %requested_link_id,
993 conn_id,
994 "Connection already exists for link_id"
995 );
996 }
997 Ok(None) => {}
998 }
999 }
1000
1001 if connection_success && !existing_conn_for_link_id {
1002 match self
1003 .inner
1004 .message_processor
1005 .connect(client_config.clone(), None, None)
1006 .await
1007 {
1008 Err(e) => {
1009 connection_success = false;
1010 connection_error_msg =
1011 format!("Connection failed: {}", e);
1012 }
1013 Ok(conn_id) => {
1014 if !requested_link_id.is_empty() {
1015 self.inner
1016 .link_id_to_conn_id
1017 .write()
1018 .insert(
1019 requested_link_id.clone(),
1020 conn_id.1,
1021 );
1022 }
1023 info!(
1024 endpoint = %client_endpoint, "Successfully created connection",
1025 );
1026 }
1027 }
1028 }
1029 }
1030 }
1031
1032 connections_status.push(v1::ConnectionAck {
1034 link_id: conn.link_id.clone(),
1035 success: connection_success,
1036 error_msg: connection_error_msg,
1037 });
1038 }
1039
1040 for route in &config.routes_to_set {
1042 let mut route_success = true;
1043 let mut route_error_msg = String::new();
1044
1045 let conn = self.resolve_route_connection(route);
1046
1047 if let Ok(Some(conn)) = conn {
1048 let name = route.name.clone().unwrap();
1049 let source = name.clone().with_id(NameId::NULL_COMPONENT);
1050
1051 let msg = DataPlaneMessage::builder()
1052 .source(source.clone())
1053 .destination(name.clone())
1054 .identity("")
1055 .flags(SlimHeaderFlags::default().with_recv_from(conn))
1056 .build_subscribe()
1057 .unwrap();
1058
1059 match self.send_subscribe_message_with_ack(msg).await {
1060 Ok(subscription_id) => {
1061 self.inner
1063 .route_subscription_ids
1064 .lock()
1065 .insert((name.clone(), conn), subscription_id);
1066 info!(?route, "Successfully created route");
1067 }
1068 Err(err) => {
1069 route_success = false;
1070 route_error_msg =
1071 format!("Failed to subscribe: {}", err);
1072 }
1073 }
1074 } else {
1075 route_success = false;
1076 route_error_msg = match conn {
1077 Ok(None) => {
1078 format!(
1079 "Connection with link_id {} not found",
1080 route.link_id.as_deref().unwrap_or("<none>")
1081 )
1082 }
1083 Err(err) => err,
1084 _ => "unknown connection lookup error".to_string(),
1085 };
1086 }
1087
1088 routes_status.push(v1::RouteAck {
1090 route: Some(route.clone()),
1091 success: route_success,
1092 error_msg: route_error_msg,
1093 });
1094 }
1095
1096 for route in &config.routes_to_delete {
1098 let mut route_success = true;
1099 let mut route_error_msg = String::new();
1100
1101 let conn = self.resolve_route_connection(route);
1102
1103 if let Ok(Some(conn)) = conn {
1104 let name = route.name.clone().unwrap();
1105 let source = name.clone().with_id(NameId::NULL_COMPONENT);
1106
1107 let msg = DataPlaneMessage::builder()
1108 .source(source.clone())
1109 .destination(name.clone())
1110 .identity("")
1111 .flags(SlimHeaderFlags::default().with_recv_from(conn))
1112 .build_unsubscribe()
1113 .unwrap();
1114
1115 let sub_id = self
1116 .inner
1117 .route_subscription_ids
1118 .lock()
1119 .remove(&(name.clone(), conn));
1120 let unsubscribe_result = match sub_id {
1121 Some(subscription_id) => {
1122 self.send_unsubscribe_message_with_ack(
1123 msg,
1124 subscription_id,
1125 )
1126 .await
1127 }
1128 None => {
1129 let (ack_id, ack_rx) =
1135 self.inner.subscription_manager.register_ack();
1136 let mut fresh_msg = msg;
1137 fresh_msg.set_subscription_id(ack_id);
1138 if let Err(e) =
1139 self.send_control_message(fresh_msg).await
1140 {
1141 self.inner.subscription_manager.cancel_ack(ack_id);
1142 Err(format!("datapath send error: {}", e.chain()))
1143 } else {
1144 match tokio::time::timeout(
1145 SUBSCRIPTION_ACK_TIMEOUT,
1146 ack_rx,
1147 )
1148 .await
1149 {
1150 Ok(Ok(Ok(()))) => Ok(()),
1151 Ok(Ok(Err(err))) => Err(err.to_string()),
1152 Ok(Err(_)) => {
1153 Err("subscription ack channel closed"
1154 .to_string())
1155 }
1156 Err(_) => {
1157 self.inner
1158 .subscription_manager
1159 .cancel_ack(ack_id);
1160 Err("subscription ack timed out"
1161 .to_string())
1162 }
1163 }
1164 }
1165 }
1166 };
1167 if let Err(err) = unsubscribe_result {
1168 route_success = false;
1169 route_error_msg = format!("Failed to unsubscribe: {}", err);
1170 } else {
1171 info!(?route, "Successfully deleted route");
1172 }
1173 } else {
1174 route_success = false;
1175 route_error_msg = match conn {
1176 Ok(None) => {
1177 format!(
1178 "Connection with link_id {} not found",
1179 route.link_id.as_deref().unwrap_or("<none>")
1180 )
1181 }
1182 Err(err) => err,
1183 _ => "unknown connection lookup error".to_string(),
1184 };
1185 }
1186
1187 routes_status.push(v1::RouteAck {
1189 route: Some(route.clone()),
1190 success: route_success,
1191 error_msg: route_error_msg,
1192 });
1193 }
1194
1195 (connections_status, routes_status)
1196 };
1197
1198 let config_ack = v1::ConfigurationCommandAck {
1199 original_message_id: msg.message_id.clone(),
1200 connections_status,
1201 routes_status,
1202 };
1203
1204 let reply = ControlMessage {
1205 message_id: uuid::Uuid::new_v4().to_string(),
1206 payload: Some(Payload::ConfigCommandAck(config_ack)),
1207 };
1208
1209 if let Err(e) = tx.send(Ok(reply)).await {
1210 error!(error = %e.chain(), "failed to send ConfigurationCommandAck");
1211 }
1212
1213 info!(
1214 connections = %config.connections_to_create.len(),
1215 connections_to_delete = %config.connections_to_delete.len(),
1216 routes_to_set = %config.routes_to_set.len(),
1217 routes_to_del = %config.routes_to_delete.len(),
1218 "Processed ConfigurationCommand"
1219 );
1220 }
1221 Payload::RouteListRequest(_) => {
1222 const CHUNK_SIZE: usize = 100;
1223
1224 let conn_table = self.inner.message_processor.connection_table();
1225 let mut entries = Vec::new();
1226
1227 self.inner.message_processor.subscription_table().for_each(
1228 |name, id, local, remote, peer| {
1229 let mut entry = RouteEntry {
1230 name: Some(name.clone().with_id(id)),
1231 ..Default::default()
1232 };
1233
1234 for &cid in local {
1235 entry.connections.push(ConnectionEntry {
1236 id: cid,
1237 connection_type: ConnectionType::Local as i32,
1238 config_data: "{}".to_string(),
1239 link_id: None,
1240 direction: ConnectionDirection::Outgoing as i32,
1241 peer_node_id: None,
1242 });
1243 }
1244
1245 for &cid in remote {
1246 if let Some(conn) = conn_table.get(cid) {
1247 entry.connections.push(ConnectionEntry {
1248 id: cid,
1249 connection_type: ConnectionType::Remote as i32,
1250 config_data: match conn.config_data() {
1251 Some(data) => serde_json::to_string(data)
1252 .unwrap_or_else(|_| "{}".to_string()),
1253 None => "{}".to_string(),
1254 },
1255 link_id: conn.link_id(),
1256 direction: if conn.is_outgoing() {
1257 ConnectionDirection::Outgoing as i32
1258 } else {
1259 ConnectionDirection::Incoming as i32
1260 },
1261 peer_node_id: conn.peer_node_id().map(str::to_string),
1262 });
1263 } else {
1264 error!(%cid, "no connection entry for id");
1265 }
1266 }
1267
1268 for &cid in peer {
1269 if let Some(conn) = conn_table.get(cid) {
1270 entry.connections.push(ConnectionEntry {
1271 id: cid,
1272 connection_type: ConnectionType::Peer as i32,
1273 config_data: "{}".to_string(),
1274 link_id: conn.link_id(),
1275 direction: if conn.is_outgoing() {
1276 ConnectionDirection::Outgoing as i32
1277 } else {
1278 ConnectionDirection::Incoming as i32
1279 },
1280 peer_node_id: conn.peer_node_id().map(str::to_string),
1281 });
1282 } else {
1283 error!(%cid, "no connection entry for id (peer)");
1284 }
1285 }
1286 entries.push(entry);
1287 },
1288 );
1289
1290 let chunks: Vec<_> = entries.chunks(CHUNK_SIZE).collect();
1291 if chunks.is_empty() {
1292 let resp = ControlMessage {
1293 message_id: uuid::Uuid::new_v4().to_string(),
1294 payload: Some(Payload::RouteListResponse(RouteListResponse {
1295 original_message_id: msg.message_id.clone(),
1296 entries: vec![],
1297 done: true,
1298 })),
1299 };
1300 if let Err(e) = tx.send(Ok(resp)).await {
1301 error!(error = %e.chain(), "failed to send route list response");
1302 }
1303 } else {
1304 let n = chunks.len();
1305 for (i, chunk) in chunks.into_iter().enumerate() {
1306 let resp = ControlMessage {
1307 message_id: uuid::Uuid::new_v4().to_string(),
1308 payload: Some(Payload::RouteListResponse(RouteListResponse {
1309 original_message_id: msg.message_id.clone(),
1310 entries: chunk.to_vec(),
1311 done: i + 1 == n,
1312 })),
1313 };
1314 if let Err(e) = tx.send(Ok(resp)).await {
1315 error!(error = %e.chain(), "failed to send route batch");
1316 break;
1317 }
1318 }
1319 }
1320 }
1321 Payload::ConnectionListRequest(_) => {
1322 let mut all_entries = Vec::new();
1323 self.inner
1324 .message_processor
1325 .connection_table()
1326 .for_each(|id, conn| {
1327 debug!(
1328 conn_id = id,
1329 local_addr = ?conn.local_addr(),
1330 remote_addr = ?conn.remote_addr(),
1331 is_outgoing = conn.is_outgoing(),
1332 link_id = ?conn.link_id(),
1333 "connection entry",
1334 );
1335 all_entries.push(ConnectionEntry {
1336 id,
1337 connection_type: ConnectionType::Remote as i32,
1338 config_data: match conn.config_data() {
1339 Some(data) => serde_json::to_string(data)
1340 .unwrap_or_else(|_| "{}".to_string()),
1341 None => "{}".to_string(),
1342 },
1343 link_id: conn.link_id(),
1344 direction: if conn.is_outgoing() {
1345 ConnectionDirection::Outgoing as i32
1346 } else {
1347 ConnectionDirection::Incoming as i32
1348 },
1349 peer_node_id: conn.peer_node_id().map(str::to_string),
1350 });
1351 });
1352
1353 const CHUNK_SIZE: usize = 100;
1354 let chunks: Vec<_> = all_entries.chunks(CHUNK_SIZE).collect();
1355 if chunks.is_empty() {
1356 let resp = ControlMessage {
1357 message_id: uuid::Uuid::new_v4().to_string(),
1358 payload: Some(Payload::ConnectionListResponse(
1359 ConnectionListResponse {
1360 original_message_id: msg.message_id.clone(),
1361 entries: vec![],
1362 done: true,
1363 },
1364 )),
1365 };
1366 if let Err(e) = tx.send(Ok(resp)).await {
1367 error!(error = %e.chain(), "failed to send connection list response");
1368 }
1369 } else {
1370 let n = chunks.len();
1371 for (i, chunk) in chunks.into_iter().enumerate() {
1372 let resp = ControlMessage {
1373 message_id: uuid::Uuid::new_v4().to_string(),
1374 payload: Some(Payload::ConnectionListResponse(
1375 ConnectionListResponse {
1376 original_message_id: msg.message_id.clone(),
1377 entries: chunk.to_vec(),
1378 done: i + 1 == n,
1379 },
1380 )),
1381 };
1382 if let Err(e) = tx.send(Ok(resp)).await {
1383 error!(error = %e.chain(), "failed to send connection list batch");
1384 break;
1385 }
1386 }
1387 }
1388 }
1389 Payload::RegisterNodeRequest(_) => {
1390 error!("received a register node request");
1391 }
1392 Payload::DeregisterNodeRequest(_) => {
1393 error!("received a deregister node request");
1394 }
1395 _ => {
1396 debug!("received unsupported message type from control - ignoring");
1398 }
1399 }
1400 }
1401 None => {
1402 error!(
1403 message_id = %msg.message_id,
1404 "received control message with no payload",
1405 );
1406 }
1407 }
1408
1409 Ok(())
1410 }
1411
1412 async fn handle_subscribe_message(&self, dst: ProtoName, clients: &[ClientConfig]) {
1413 let mut sub_vec = vec![];
1414
1415 let cmd = v1::Route {
1416 name: Some(dst),
1417 link_id: None,
1418 direction: None,
1419 };
1420
1421 sub_vec.push(cmd);
1422
1423 let ctrl = ControlMessage {
1424 message_id: uuid::Uuid::new_v4().to_string(),
1425 payload: Some(Payload::ConfigCommand(v1::ConfigurationCommand {
1426 connections_to_create: vec![],
1427 connections_to_delete: vec![],
1428 routes_to_set: sub_vec,
1429 routes_to_delete: vec![],
1430 reconcile: false,
1431 })),
1432 };
1433
1434 return self.send_or_queue_notification(ctrl, clients).await;
1435 }
1436
1437 async fn handle_unsubscribe_message(&self, dst: ProtoName, clients: &[ClientConfig]) {
1438 let mut unsub_vec = vec![];
1439
1440 let cmd = v1::Route {
1441 name: Some(dst),
1442 link_id: None,
1443 direction: None,
1444 };
1445
1446 unsub_vec.push(cmd);
1447
1448 let ctrl = ControlMessage {
1449 message_id: uuid::Uuid::new_v4().to_string(),
1450 payload: Some(Payload::ConfigCommand(v1::ConfigurationCommand {
1451 connections_to_create: vec![],
1452 connections_to_delete: vec![],
1453 routes_to_set: vec![],
1454 routes_to_delete: unsub_vec,
1455 reconcile: false,
1456 })),
1457 };
1458
1459 return self.send_or_queue_notification(ctrl, clients).await;
1460 }
1461
1462 async fn send_subscribe_message_with_ack(
1464 &self,
1465 mut msg: DataPlaneMessage,
1466 ) -> Result<u64, String> {
1467 let (ack_id, ack_rx) = self.inner.subscription_manager.register_ack();
1468 msg.set_subscription_id(ack_id);
1469
1470 if let Err(e) = self.send_control_message(msg).await {
1471 self.inner.subscription_manager.cancel_ack(ack_id);
1472 return Err(format!("datapath send error: {}", e.chain()));
1473 }
1474
1475 match tokio::time::timeout(SUBSCRIPTION_ACK_TIMEOUT, ack_rx).await {
1476 Ok(Ok(Ok(()))) => Ok(ack_id),
1477 Ok(Ok(Err(err))) => Err(err.to_string()),
1478 Ok(Err(_)) => Err("subscription ack channel closed".to_string()),
1479 Err(_) => {
1480 self.inner.subscription_manager.cancel_ack(ack_id);
1481 Err("subscription ack timed out".to_string())
1482 }
1483 }
1484 }
1485
1486 async fn send_unsubscribe_message_with_ack(
1488 &self,
1489 mut msg: DataPlaneMessage,
1490 subscription_id: u64,
1491 ) -> Result<(), String> {
1492 let ack_rx = self
1493 .inner
1494 .subscription_manager
1495 .register_ack_with_id(subscription_id);
1496 msg.set_subscription_id(subscription_id);
1497
1498 if let Err(e) = self.send_control_message(msg).await {
1499 self.inner.subscription_manager.cancel_ack(subscription_id);
1500 return Err(format!("datapath send error: {}", e.chain()));
1501 }
1502
1503 match tokio::time::timeout(SUBSCRIPTION_ACK_TIMEOUT, ack_rx).await {
1504 Ok(Ok(Ok(()))) => Ok(()),
1505 Ok(Ok(Err(err))) => Err(err.to_string()),
1506 Ok(Err(_)) => Err("subscription ack channel closed".to_string()),
1507 Err(_) => {
1508 self.inner.subscription_manager.cancel_ack(subscription_id);
1509 Err("subscription ack timed out".to_string())
1510 }
1511 }
1512 }
1513
1514 async fn send_control_message(&self, msg: DataPlaneMessage) -> Result<(), ControllerError> {
1516 self.inner.tx_slim.send(Ok(msg)).await.map_err(|e| {
1517 error!(error = %e.chain(), "error sending message into datapath");
1518 ControllerError::Datapath(slim_datapath::errors::DataPathError::ConnectionError)
1519 })
1520 }
1521
1522 async fn send_or_queue_notification(&self, ctrl_msg: ControlMessage, clients: &[ClientConfig]) {
1529 let mut sent = false;
1530
1531 for c in clients {
1532 let tx = match self.inner.tx_channels.read().get(&c.endpoint) {
1533 Some(tx) => tx.clone(),
1534 None => continue,
1535 };
1536
1537 match tx.try_send(Ok(ctrl_msg.clone())) {
1538 Ok(()) => {
1539 sent = true;
1540 }
1541 Err(mpsc::error::TrySendError::Full(_)) => {
1542 debug!(
1543 endpoint = %c.endpoint,
1544 "channel full, queuing notification instead of blocking"
1545 );
1546 }
1547 Err(mpsc::error::TrySendError::Closed(_)) => {
1548 debug!(
1549 endpoint = %c.endpoint,
1550 "channel closed, queuing notification"
1551 );
1552 }
1553 }
1554 }
1555
1556 if !sent {
1557 let mut queue = self.inner.pending_notifications.lock();
1558 if queue.len() >= MAX_QUEUED_NOTIFICATIONS {
1559 queue.pop_front();
1560 debug!("queue full, removed oldest notification");
1561 }
1562 queue.push_back(ctrl_msg);
1563 }
1564 }
1565
1566 fn drain_watch(&self) -> Result<drain::Watch, ControllerError> {
1568 self.inner
1569 .drain_watch
1570 .read()
1571 .clone()
1572 .ok_or(ControllerError::AlreadyStopped)
1573 }
1574
1575 async fn send_queued_notifications(
1577 &self,
1578 tx: &mpsc::Sender<Result<ControlMessage, Status>>,
1579 endpoint: &str,
1580 ) {
1581 let notifications = {
1582 let mut queue = self.inner.pending_notifications.lock();
1583 if queue.is_empty() {
1584 return;
1585 }
1586 queue.drain(..).collect::<Vec<_>>()
1587 };
1588
1589 if notifications.is_empty() {
1590 return;
1591 }
1592
1593 debug!(
1594 "sending {} queued subscription notifications to {}",
1595 notifications.len(),
1596 endpoint
1597 );
1598
1599 let mut failed_notifications = Vec::new();
1600 for notification in notifications {
1601 if let Err(e) = tx.send(Ok(notification)).await {
1602 error!(
1603 error = %e.chain(),
1604 %endpoint,
1605 "failed to send queued notification to control plane",
1606 );
1607
1608 failed_notifications.push(e.0.unwrap());
1610 }
1611 }
1612
1613 if !failed_notifications.is_empty() {
1615 self.inner
1616 .pending_notifications
1617 .lock()
1618 .extend(failed_notifications);
1619 }
1620 }
1621
1622 fn process_control_message_stream(
1624 &self,
1625 config: Option<ClientConfig>,
1626 mut stream: impl Stream<Item = Result<ControlMessage, Status>> + Unpin + Send + 'static,
1627 tx: mpsc::Sender<Result<ControlMessage, Status>>,
1628 cancellation_token: CancellationToken,
1629 ) -> Result<JoinHandle<()>, ControllerError> {
1630 let this = self.clone();
1631 let watch = self.drain_watch()?;
1632
1633 let handle = tokio::spawn(async move {
1634 let endpoint = config
1636 .as_ref()
1637 .map(|c| c.endpoint.clone())
1638 .unwrap_or_else(|| "unknown".to_string());
1639 info!(%endpoint, "connected to control plane");
1640
1641 let mut retry_connect = false;
1642
1643 let mut active_connections = Vec::new();
1644 this.inner
1645 .message_processor
1646 .connection_table()
1647 .for_each(|id, conn| {
1648 active_connections.push(v1::ConnectionEntry {
1649 id,
1650 connection_type: v1::ConnectionType::Remote as i32,
1651 config_data: match conn.config_data() {
1652 Some(data) => {
1653 serde_json::to_string(data).unwrap_or_else(|_| "{}".to_string())
1654 }
1655 None => "{}".to_string(),
1656 },
1657 link_id: conn.link_id(),
1658 direction: if conn.is_outgoing() {
1659 v1::ConnectionDirection::Outgoing as i32
1660 } else {
1661 v1::ConnectionDirection::Incoming as i32
1662 },
1663 peer_node_id: conn.peer_node_id().map(str::to_string),
1664 });
1665 });
1666
1667 let active_routes = {
1668 let conn_id_to_link_id: HashMap<u64, String> = this
1669 .inner
1670 .link_id_to_conn_id
1671 .read()
1672 .iter()
1673 .map(|(lid, cid)| (*cid, lid.clone()))
1674 .collect();
1675 this.inner
1676 .route_subscription_ids
1677 .lock()
1678 .iter()
1679 .map(|((name, conn_id), _sub_id)| v1::Route {
1680 name: Some(name.clone()),
1681 link_id: conn_id_to_link_id.get(conn_id).cloned(),
1682 direction: None,
1683 })
1684 .collect::<Vec<_>>()
1685 };
1686
1687 let register_request = ControlMessage {
1688 message_id: uuid::Uuid::new_v4().to_string(),
1689 payload: Some(Payload::RegisterNodeRequest(v1::RegisterNodeRequest {
1690 node_id: this.inner.id.clone(),
1691 group_name: this.inner.group_name.clone(),
1692 connection_details: this.inner.connection_details.clone(),
1693 connections: active_connections,
1694 routes: active_routes,
1695 })),
1696 };
1697
1698 if config.is_some()
1700 && let Err(e) = tx.send(Ok(register_request)).await
1701 {
1702 error!(error = %e.chain(), "failed to send register request");
1703 return;
1704 }
1705
1706 this.send_queued_notifications(&tx, &endpoint).await;
1709
1710 if config.is_some() {
1712 let registration_result = tokio::time::timeout(
1713 Duration::from_secs(10),
1714 async {
1715 loop {
1716 tokio::select! {
1717 next = stream.next() => {
1718 match next {
1719 Some(Ok(msg)) => {
1720 if let Some(Payload::RegisterNodeResponse(resp)) = msg.payload {
1721 if resp.success {
1722 info!(
1723 connections = resp.connections.len(),
1724 routes = resp.routes.len(),
1725 "registration acknowledged by control plane"
1726 );
1727 return Ok(resp);
1728 } else {
1729 return Err("registration rejected by control plane".to_string());
1730 }
1731 }
1732 }
1733 Some(Err(_)) => return Err("stream error waiting for registration response".to_string()),
1734 None => return Err("stream closed before registration response".to_string()),
1735 }
1736 }
1737 _ = cancellation_token.cancelled() => {
1738 return Err("cancelled while waiting for registration response".to_string());
1739 }
1740 }
1741 }
1742 }
1743 ).await;
1744
1745 match registration_result {
1746 Ok(Ok(resp)) => {
1747 if !resp.connections.is_empty() || !resp.routes.is_empty() {
1749 let init_cmd = ControlMessage {
1750 message_id: uuid::Uuid::new_v4().to_string(),
1751 payload: Some(Payload::ConfigCommand(v1::ConfigurationCommand {
1752 connections_to_create: resp.connections,
1753 routes_to_set: resp.routes,
1754 routes_to_delete: vec![],
1755 connections_to_delete: vec![],
1756 reconcile: true,
1757 })),
1758 };
1759 if let Err(e) = this.handle_new_control_message(init_cmd, &tx).await {
1760 error!(error = %e.chain(), "failed to apply initial state from registration");
1761 }
1762 }
1763 }
1764 Ok(Err(e)) => {
1765 error!(%e, "registration handshake failed");
1766 return;
1767 }
1768 Err(_) => {
1769 error!("registration handshake timed out");
1770 return;
1771 }
1772 }
1773 }
1774
1775 let mut drain_fut = std::pin::pin!(watch.clone().signaled());
1776
1777 loop {
1778 tokio::select! {
1779 next = stream.next() => {
1780 match next {
1781 Some(Ok(msg)) => {
1782 if let Err(e) = this.handle_new_control_message(msg, &tx).await {
1783 error!(error = %e.chain(), "error processing incoming control message");
1784 }
1785 }
1786 Some(Err(e)) => {
1787 if let Some(io_err) = Self::match_for_io_error(&e) {
1788 if io_err.kind() == std::io::ErrorKind::BrokenPipe {
1789 info!("connection closed by peer");
1790 } else {
1791 error!(
1793 error = %e.chain(),
1794 io_error_kind = ?io_err.kind(),
1795 "IO error receiving control messages"
1796 );
1797 }
1798 } else {
1799 error!(error = %e.chain(), "error receiving control messages");
1801 }
1802
1803 retry_connect = true;
1804 break;
1805 }
1806 None => {
1807 debug!("end of stream");
1808 retry_connect = true;
1809 break;
1810 }
1811 }
1812 }
1813 _ = cancellation_token.cancelled() => {
1814 debug!("shutting down stream on cancellation token");
1815 break;
1816 }
1817 _ = &mut drain_fut => {
1818 debug!("shutting down stream on drain");
1819 break;
1820 }
1821 }
1822 }
1823
1824 info!(%endpoint, "control plane stream closed");
1825
1826 if retry_connect && let Some(config) = config {
1827 info!(%config.endpoint, "retrying connection to control plane");
1828 this.connect(config.clone(), cancellation_token)
1829 .await
1830 .map_or_else(
1831 |e| {
1832 error!(error = %e.chain(), "failed to reconnect to control plane");
1833 },
1834 |tx| {
1835 info!(%config.endpoint, "reconnected to control plane");
1836
1837 this.inner
1838 .tx_channels
1839 .write()
1840 .insert(config.endpoint.clone(), tx);
1841 },
1842 )
1843 }
1844 });
1845
1846 Ok(handle)
1847 }
1848
1849 async fn connect(
1853 &self,
1854 config: ClientConfig,
1855 cancellation_token: CancellationToken,
1856 ) -> Result<mpsc::Sender<Result<ControlMessage, Status>>, ControllerError> {
1857 info!(%config.endpoint, "connecting to control plane");
1858
1859 self.inner.route_subscription_ids.lock().clear();
1863 self.inner.link_id_to_conn_id.write().clear();
1864
1865 let watch = self.drain_watch()?;
1867
1868 let connect_fut = async {
1869 let channel = match config.to_channel().await? {
1870 TransportChannel::Grpc(c) => c,
1871 TransportChannel::Websocket(_) => {
1872 return Err(ControllerError::ConfigError(
1873 slim_config::errors::ConfigError::GrpcChannelUnsupportedTransport,
1874 ));
1875 }
1876 };
1877
1878 let mut client = ControllerServiceClient::new(channel.clone());
1879 let (tx, rx) = mpsc::channel::<Result<ControlMessage, Status>>(128);
1880 let out_stream = ReceiverStream::new(rx).filter_map(|res| match res {
1881 Ok(msg) => Some(msg),
1882 Err(e) => {
1883 error!(error = %e, "dropping outbound control message due to error");
1884 None
1885 }
1886 });
1887 let stream = client
1888 .open_control_channel(Request::new(out_stream))
1889 .await?;
1890 Ok((tx, stream))
1891 };
1892
1893 let (tx, stream) = tokio::select! {
1894 result = connect_fut => { result? }
1895 _ = cancellation_token.cancelled() => {
1896 debug!("connection cancelled during setup");
1897 return Err(ControllerError::Canceled);
1898 }
1899 _ = watch.signaled() => {
1900 debug!("drain signal received during connection setup");
1901 return Err(ControllerError::Canceled);
1902 }
1903 };
1904
1905 let endpoint_key = config.endpoint.clone();
1907 let handle = self.process_control_message_stream(
1908 Some(config),
1909 stream.into_inner(),
1910 tx.clone(),
1911 cancellation_token.clone(),
1912 )?;
1913 self.inner
1914 .stream_handles
1915 .lock()
1916 .insert(endpoint_key, handle);
1917
1918 Ok(tx)
1920 }
1921
1922 fn match_for_io_error(err_status: &Status) -> Option<&std::io::Error> {
1923 let mut err: &(dyn std::error::Error + 'static) = err_status;
1924
1925 loop {
1926 if let Some(io_err) = err.downcast_ref::<std::io::Error>() {
1927 return Some(io_err);
1928 }
1929
1930 if let Some(h2_err) = err.downcast_ref::<h2::Error>()
1933 && let Some(io_err) = h2_err.get_io()
1934 {
1935 return Some(io_err);
1936 }
1937
1938 err = err.source()?;
1939 }
1940 }
1941}
1942
1943#[tonic::async_trait]
1944impl GrpcControllerService for ControllerService {
1945 type OpenControlChannelStream =
1946 Pin<Box<dyn Stream<Item = Result<ControlMessage, Status>> + Send + 'static>>;
1947
1948 async fn open_control_channel(
1949 &self,
1950 request: Request<tonic::Streaming<ControlMessage>>,
1951 ) -> Result<Response<Self::OpenControlChannelStream>, Status> {
1952 let remote_endpoint = request
1954 .remote_addr()
1955 .map(|addr| addr.to_string())
1956 .unwrap_or_else(|| "unknown".to_string());
1957
1958 let stream = request.into_inner();
1959 let (tx, rx) = mpsc::channel::<Result<ControlMessage, Status>>(128);
1960
1961 let cancellation_token = CancellationToken::new();
1962
1963 let handle = self
1965 .process_control_message_stream(None, stream, tx.clone(), cancellation_token.clone())
1966 .map_err(|e| {
1967 error!(error = %e.chain(), "error processing control message stream");
1968 Status::unavailable("failed to process control message stream")
1969 })?;
1970 self.inner
1971 .stream_handles
1972 .lock()
1973 .insert(remote_endpoint.clone(), handle);
1974
1975 self.inner
1976 .tx_channels
1977 .write()
1978 .insert(remote_endpoint.clone(), tx);
1979
1980 if let Some(old_token) = self
1981 .inner
1982 .cancellation_tokens
1983 .write()
1984 .insert(remote_endpoint.clone(), cancellation_token)
1985 {
1986 old_token.cancel();
1987 }
1988
1989 let out_stream = ReceiverStream::new(rx);
1990 Ok(Response::new(
1991 Box::pin(out_stream) as Self::OpenControlChannelStream
1992 ))
1993 }
1994}
1995
1996#[cfg(test)]
1997mod tests {
1998 use super::*;
1999 use tracing_test::traced_test;
2000
2001 async fn setup_control_planes(
2002 server_endpoint: &str,
2003 server_name: &str,
2004 client_name: &str,
2005 ) -> (ControlPlane, ControlPlane, ClientConfig) {
2006 let server_config = ServerConfig::with_endpoint(server_endpoint)
2007 .with_tls_settings(slim_config::tls::server::TlsServerConfig::insecure());
2008 let client_config = ClientConfig::with_endpoint(&format!("http://{}", server_endpoint))
2009 .with_tls_setting(slim_config::tls::client::TlsClientConfig::insecure());
2010
2011 let message_processor_server = MessageProcessor::new();
2012 let message_processor_client = MessageProcessor::new();
2013
2014 let control_plane_server = ControlPlane::new(ControlPlaneSettings {
2015 id: server_name.to_string(),
2016 group_name: None,
2017 servers: vec![server_config.clone()],
2018 clients: vec![],
2019 message_processor: message_processor_server,
2020 connection_details: vec![from_server_config(&server_config)],
2021 });
2022
2023 let control_plane_client = ControlPlane::new(ControlPlaneSettings {
2024 id: client_name.to_string(),
2025 group_name: None,
2026 servers: vec![],
2027 clients: vec![client_config.clone()],
2028 message_processor: message_processor_client,
2029 connection_details: vec![],
2030 });
2031
2032 (control_plane_server, control_plane_client, client_config)
2033 }
2034
2035 #[tokio::test]
2036 #[traced_test]
2037 async fn test_end_to_end() {
2038 let (mut control_plane_server, mut control_plane_client, _client_cfg) =
2039 setup_control_planes(
2040 "127.0.0.1:50051",
2041 "test-server-instance",
2042 "test-client-instance",
2043 )
2044 .await;
2045
2046 control_plane_server.run().await.unwrap();
2047 tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
2048 control_plane_client.run().await.unwrap();
2049 tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
2050
2051 assert!(logs_contain("received a register node request"));
2052 }
2053
2054 #[tokio::test]
2055 #[traced_test]
2056 async fn test_subscription_notification_queue_drain() {
2057 let (mut control_plane_server, mut control_plane_client, client_config) =
2059 setup_control_planes(
2060 "127.0.0.1:50061",
2061 "queue-drain-server",
2062 "queue-drain-client",
2063 )
2064 .await;
2065
2066 let controller = control_plane_client.controller.clone();
2067 assert_eq!(controller.inner.pending_notifications.lock().len(), 0);
2068
2069 const N: usize = 5;
2070 for i in 0..N {
2071 let ctrl_msg = ControlMessage {
2072 message_id: uuid::Uuid::new_v4().to_string(),
2073 payload: Some(Payload::ConfigCommand(v1::ConfigurationCommand {
2074 connections_to_create: vec![],
2075 connections_to_delete: vec![],
2076 routes_to_set: vec![v1::Route {
2077 name: Some(
2078 ProtoName::from_strings(["queued", "sub", &format!("name-{i}")])
2079 .with_id(i as u128),
2080 ),
2081 link_id: None,
2082 direction: None,
2083 }],
2084 routes_to_delete: vec![],
2085 reconcile: false,
2086 })),
2087 };
2088 controller
2089 .send_or_queue_notification(ctrl_msg, std::slice::from_ref(&client_config))
2090 .await;
2091 }
2092 assert_eq!(controller.inner.pending_notifications.lock().len(), N);
2093
2094 control_plane_server.run().await.expect("server run failed");
2095 tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
2096 control_plane_client.run().await.expect("client run failed");
2097 tokio::time::sleep(tokio::time::Duration::from_millis(200)).await;
2098
2099 assert_eq!(controller.inner.pending_notifications.lock().len(), 0);
2100 assert!(
2101 logs_contain(&format!("sending {} queued subscription notifications", N)),
2102 "Expected log about sending queued subscription notifications"
2103 );
2104
2105 drop(controller);
2106 drop(control_plane_server);
2107 drop(control_plane_client);
2108 }
2109
2110 #[tokio::test]
2111 #[traced_test]
2112 async fn test_delete_connection_by_link_id_success_ack() {
2113 let (mut control_plane_server, mut control_plane_client, _client_cfg) =
2114 setup_control_planes(
2115 "127.0.0.1:50081",
2116 "delete-linkid-server",
2117 "delete-linkid-client",
2118 )
2119 .await;
2120
2121 control_plane_server.run().await.unwrap();
2122 tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
2123 control_plane_client.run().await.unwrap();
2124 tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
2125
2126 let controller = control_plane_client.controller.clone();
2127 let link_id = "test-delete-link-id".to_string();
2128
2129 let (tx, _rx) = tokio::sync::mpsc::channel(16);
2131 let conn = slim_datapath::connection::Connection::new(
2132 slim_datapath::tables::ConnType::Remote,
2133 slim_datapath::connection::Channel::Server(tx),
2134 )
2135 .with_negotiation(&link_id, "1.0.0");
2136 controller
2137 .inner
2138 .message_processor
2139 .forwarder()
2140 .on_connection_established(conn, None)
2141 .unwrap();
2142
2143 let ctrl_msg = ControlMessage {
2144 message_id: uuid::Uuid::new_v4().to_string(),
2145 payload: Some(Payload::ConfigCommand(v1::ConfigurationCommand {
2146 connections_to_create: vec![],
2147 connections_to_delete: vec![link_id.clone()],
2148 routes_to_set: vec![],
2149 routes_to_delete: vec![],
2150 reconcile: false,
2151 })),
2152 };
2153 let (tx, mut rx) = mpsc::channel(1);
2154 controller
2155 .handle_new_control_message(ctrl_msg, &tx)
2156 .await
2157 .expect("config command must be handled");
2158
2159 let ack_msg = rx
2160 .recv()
2161 .await
2162 .expect("expected ack message")
2163 .expect("ack should be ok");
2164 let ack = match ack_msg.payload {
2165 Some(Payload::ConfigCommandAck(ack)) => ack,
2166 _ => panic!("expected ConfigCommandAck payload"),
2167 };
2168 assert_eq!(ack.connections_status.len(), 1);
2169 assert_eq!(ack.connections_status[0].link_id, link_id);
2170 assert!(ack.connections_status[0].success);
2171 }
2172
2173 #[tokio::test]
2174 #[traced_test]
2175 async fn test_delete_connection_by_link_id_unknown_fails_ack() {
2176 let (control_plane_server, control_plane_client, _client_cfg) = setup_control_planes(
2177 "127.0.0.1:50082",
2178 "delete-linkid-server-unknown",
2179 "delete-linkid-client-unknown",
2180 )
2181 .await;
2182
2183 let controller = control_plane_client.controller.clone();
2184 let ctrl_msg = ControlMessage {
2185 message_id: uuid::Uuid::new_v4().to_string(),
2186 payload: Some(Payload::ConfigCommand(v1::ConfigurationCommand {
2187 connections_to_create: vec![],
2188 connections_to_delete: vec!["unknown-link-id".to_string()],
2189 routes_to_set: vec![],
2190 routes_to_delete: vec![],
2191 reconcile: false,
2192 })),
2193 };
2194 let (tx, mut rx) = mpsc::channel(1);
2195 controller
2196 .handle_new_control_message(ctrl_msg, &tx)
2197 .await
2198 .expect("config command must be handled");
2199
2200 let ack_msg = rx
2201 .recv()
2202 .await
2203 .expect("expected ack message")
2204 .expect("ack should be ok");
2205 let ack = match ack_msg.payload {
2206 Some(Payload::ConfigCommandAck(ack)) => ack,
2207 _ => panic!("expected ConfigCommandAck payload"),
2208 };
2209 assert_eq!(ack.connections_status.len(), 1);
2210 assert_eq!(ack.connections_status[0].link_id, "unknown-link-id");
2211 assert!(!ack.connections_status[0].success);
2212 assert!(ack.connections_status[0].error_msg.contains("not found"));
2213
2214 drop(control_plane_server);
2215 }
2216
2217 #[tokio::test]
2218 #[traced_test]
2219 async fn test_create_connection_with_existing_link_id_reuses_connection_ack() {
2220 let (mut control_plane_server, mut control_plane_client, _client_cfg) =
2221 setup_control_planes(
2222 "127.0.0.1:50083",
2223 "create-linkid-server",
2224 "create-linkid-client",
2225 )
2226 .await;
2227
2228 control_plane_server.run().await.unwrap();
2229 tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
2230 control_plane_client.run().await.unwrap();
2231 tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
2232
2233 let controller = control_plane_client.controller.clone();
2234 let link_id = "test-create-link-id".to_string();
2235
2236 let (tx, _rx) = tokio::sync::mpsc::channel(16);
2238 let conn = slim_datapath::connection::Connection::new(
2239 slim_datapath::tables::ConnType::Remote,
2240 slim_datapath::connection::Channel::Server(tx),
2241 )
2242 .with_negotiation(&link_id, "1.0.0");
2243 controller
2244 .inner
2245 .message_processor
2246 .forwarder()
2247 .on_connection_established(conn, None)
2248 .unwrap();
2249
2250 let endpoint = "http://127.0.0.1:59999";
2251 let connection_config = serde_json::json!({
2252 "endpoint": endpoint,
2253 "link_id": link_id
2254 })
2255 .to_string();
2256
2257 let ctrl_msg = ControlMessage {
2258 message_id: uuid::Uuid::new_v4().to_string(),
2259 payload: Some(Payload::ConfigCommand(v1::ConfigurationCommand {
2260 connections_to_create: vec![v1::Connection {
2261 link_id: "reuse-existing-link".to_string(),
2262 config_data: connection_config,
2263 }],
2264 connections_to_delete: vec![],
2265 routes_to_set: vec![],
2266 routes_to_delete: vec![],
2267 reconcile: false,
2268 })),
2269 };
2270
2271 let (tx, mut rx) = mpsc::channel(1);
2272 controller
2273 .handle_new_control_message(ctrl_msg, &tx)
2274 .await
2275 .expect("config command must be handled");
2276
2277 let ack_msg = rx
2278 .recv()
2279 .await
2280 .expect("expected ack message")
2281 .expect("ack should be ok");
2282 let ack = match ack_msg.payload {
2283 Some(Payload::ConfigCommandAck(ack)) => ack,
2284 _ => panic!("expected ConfigCommandAck payload"),
2285 };
2286 assert_eq!(ack.connections_status.len(), 1);
2287 assert_eq!(ack.connections_status[0].link_id, "reuse-existing-link");
2288 assert!(ack.connections_status[0].success);
2289
2290 assert!(
2291 controller
2292 .inner
2293 .link_id_to_conn_id
2294 .read()
2295 .contains_key(&link_id),
2296 "expected link_id to be mapped to reused connection id"
2297 );
2298 }
2299
2300 #[tokio::test]
2301 #[traced_test]
2302 async fn test_subscription_set_unknown_link_id_fails_ack() {
2303 let (control_plane_server, control_plane_client, _client_cfg) = setup_control_planes(
2304 "127.0.0.1:50084",
2305 "sub-linkid-server-unknown",
2306 "sub-linkid-client-unknown",
2307 )
2308 .await;
2309
2310 let controller = control_plane_client.controller.clone();
2311 let ctrl_msg = ControlMessage {
2312 message_id: uuid::Uuid::new_v4().to_string(),
2313 payload: Some(Payload::ConfigCommand(v1::ConfigurationCommand {
2314 connections_to_create: vec![],
2315 connections_to_delete: vec![],
2316 routes_to_set: vec![v1::Route {
2317 name: Some(ProtoName::from_strings(["org", "ns", "agent"]).with_id(1u128)),
2318 link_id: Some("missing-link-id".to_string()),
2319 direction: None,
2320 }],
2321 routes_to_delete: vec![],
2322 reconcile: false,
2323 })),
2324 };
2325 let (tx, mut rx) = mpsc::channel(1);
2326 controller
2327 .handle_new_control_message(ctrl_msg, &tx)
2328 .await
2329 .expect("config command must be handled");
2330
2331 let ack_msg = rx
2332 .recv()
2333 .await
2334 .expect("expected ack message")
2335 .expect("ack should be ok");
2336 let ack = match ack_msg.payload {
2337 Some(Payload::ConfigCommandAck(ack)) => ack,
2338 _ => panic!("expected ConfigCommandAck payload"),
2339 };
2340
2341 assert_eq!(ack.routes_status.len(), 1);
2342 assert!(!ack.routes_status[0].success);
2343 assert!(
2344 ack.routes_status[0]
2345 .error_msg
2346 .contains("Connection with link_id missing-link-id not found")
2347 );
2348
2349 drop(control_plane_server);
2350 }
2351
2352 #[tokio::test]
2353 #[traced_test]
2354 async fn test_create_connection_invalid_config_fails_ack() {
2355 let (_control_plane_server, control_plane_client, _client_cfg) = setup_control_planes(
2356 "127.0.0.1:50085",
2357 "create-invalid-config-server",
2358 "create-invalid-config-client",
2359 )
2360 .await;
2361
2362 let controller = control_plane_client.controller.clone();
2363 let ctrl_msg = ControlMessage {
2364 message_id: uuid::Uuid::new_v4().to_string(),
2365 payload: Some(Payload::ConfigCommand(v1::ConfigurationCommand {
2366 connections_to_create: vec![v1::Connection {
2367 link_id: "invalid-config-conn".to_string(),
2368 config_data: "{invalid-json".to_string(),
2369 }],
2370 connections_to_delete: vec![],
2371 routes_to_set: vec![],
2372 routes_to_delete: vec![],
2373 reconcile: false,
2374 })),
2375 };
2376 let (tx, mut rx) = mpsc::channel(1);
2377 controller
2378 .handle_new_control_message(ctrl_msg, &tx)
2379 .await
2380 .expect("config command must be handled");
2381
2382 let ack_msg = rx
2383 .recv()
2384 .await
2385 .expect("expected ack message")
2386 .expect("ack should be ok");
2387 let ack = match ack_msg.payload {
2388 Some(Payload::ConfigCommandAck(ack)) => ack,
2389 _ => panic!("expected ConfigCommandAck payload"),
2390 };
2391 assert_eq!(ack.connections_status.len(), 1);
2392 assert!(!ack.connections_status[0].success);
2393 assert!(
2394 ack.connections_status[0]
2395 .error_msg
2396 .contains("Failed to parse config")
2397 );
2398 }
2399
2400 #[tokio::test]
2401 #[traced_test]
2402 async fn test_subscription_delete_unknown_link_id_fails_ack() {
2403 let (control_plane_server, control_plane_client, _client_cfg) = setup_control_planes(
2404 "127.0.0.1:50086",
2405 "sub-del-linkid-server-unknown",
2406 "sub-del-linkid-client-unknown",
2407 )
2408 .await;
2409
2410 let controller = control_plane_client.controller.clone();
2411 let ctrl_msg = ControlMessage {
2412 message_id: uuid::Uuid::new_v4().to_string(),
2413 payload: Some(Payload::ConfigCommand(v1::ConfigurationCommand {
2414 connections_to_create: vec![],
2415 connections_to_delete: vec![],
2416 routes_to_set: vec![],
2417 routes_to_delete: vec![v1::Route {
2418 name: Some(ProtoName::from_strings(["org", "ns", "agent"]).with_id(1u128)),
2419 link_id: Some("missing-link-id-delete".to_string()),
2420 direction: None,
2421 }],
2422 reconcile: false,
2423 })),
2424 };
2425 let (tx, mut rx) = mpsc::channel(1);
2426 controller
2427 .handle_new_control_message(ctrl_msg, &tx)
2428 .await
2429 .expect("config command must be handled");
2430
2431 let ack_msg = rx
2432 .recv()
2433 .await
2434 .expect("expected ack message")
2435 .expect("ack should be ok");
2436 let ack = match ack_msg.payload {
2437 Some(Payload::ConfigCommandAck(ack)) => ack,
2438 _ => panic!("expected ConfigCommandAck payload"),
2439 };
2440
2441 assert_eq!(ack.routes_status.len(), 1);
2442 assert!(!ack.routes_status[0].success);
2443 assert!(
2444 ack.routes_status[0]
2445 .error_msg
2446 .contains("Connection with link_id missing-link-id-delete not found")
2447 );
2448
2449 drop(control_plane_server);
2450 }
2451
2452 #[tokio::test]
2453 #[traced_test]
2454 async fn test_shutdown_drains_resources() {
2455 let (mut control_plane_server, mut control_plane_client, _client_cfg) =
2457 setup_control_planes(
2458 "127.0.0.1:50071",
2459 "shutdown-server-instance",
2460 "shutdown-client-instance",
2461 )
2462 .await;
2463
2464 control_plane_server
2466 .run()
2467 .await
2468 .expect("server should start");
2469 tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
2470 control_plane_client
2471 .run()
2472 .await
2473 .expect("client should start");
2474 tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
2475
2476 let server_tokens_before = control_plane_server
2478 .controller
2479 .inner
2480 .cancellation_tokens
2481 .read()
2482 .len();
2483 assert!(
2484 server_tokens_before > 0,
2485 "expected server to have active cancellation tokens before shutdown"
2486 );
2487
2488 let client_tokens_before = control_plane_client
2489 .controller
2490 .inner
2491 .cancellation_tokens
2492 .read()
2493 .len();
2494 assert!(
2495 client_tokens_before > 0,
2496 "expected client to have active cancellation tokens before shutdown"
2497 );
2498
2499 control_plane_client
2501 .shutdown()
2502 .await
2503 .expect("client shutdown ok");
2504 control_plane_server
2505 .shutdown()
2506 .await
2507 .expect("server shutdown ok");
2508
2509 let server_tokens_after = control_plane_server
2511 .controller
2512 .inner
2513 .cancellation_tokens
2514 .read()
2515 .len();
2516 assert_eq!(
2517 server_tokens_after, 0,
2518 "expected server cancellation tokens to be drained after shutdown"
2519 );
2520
2521 let client_tokens_after = control_plane_client
2522 .controller
2523 .inner
2524 .cancellation_tokens
2525 .read()
2526 .len();
2527 assert_eq!(
2528 client_tokens_after, 0,
2529 "expected client cancellation tokens to be drained after shutdown"
2530 );
2531
2532 assert!(
2534 control_plane_server.shutdown().await.is_err(),
2535 "second shutdown on server should return an error"
2536 );
2537 assert!(
2538 control_plane_client.shutdown().await.is_err(),
2539 "second shutdown on client should return an error"
2540 );
2541 }
2542
2543 #[tokio::test]
2544 #[traced_test]
2545 async fn test_shutdown_without_run() {
2546 let (control_plane_server, mut _control_plane_client, _client_cfg) = setup_control_planes(
2548 "127.0.0.1:50072",
2549 "shutdown-no-run-server",
2550 "shutdown-no-run-client",
2551 )
2552 .await;
2553
2554 assert_eq!(
2556 control_plane_server
2557 .controller
2558 .inner
2559 .cancellation_tokens
2560 .read()
2561 .len(),
2562 0,
2563 "expected zero cancellation tokens before shutdown when not run"
2564 );
2565
2566 control_plane_server
2568 .shutdown()
2569 .await
2570 .expect("shutdown without prior run should succeed");
2571
2572 assert_eq!(
2574 control_plane_server
2575 .controller
2576 .inner
2577 .cancellation_tokens
2578 .read()
2579 .len(),
2580 0,
2581 "expected zero cancellation tokens after shutdown when not run"
2582 );
2583
2584 assert!(
2586 control_plane_server.shutdown().await.is_err(),
2587 "second shutdown should error due to missing drain signal"
2588 );
2589 }
2590}