1use std::collections::{HashMap, VecDeque};
5use std::pin::Pin;
6use std::sync::Arc;
7
8use std::time::Duration;
9
10use display_error_chain::ErrorChainExt;
11use slim_config::component::id::ID;
12use slim_config::server::ServerConfig;
13use slim_session::subscription_manager::SubscriptionManager;
14use tokio::sync::mpsc;
15use tokio::task::JoinHandle;
16use tokio_stream::{Stream, StreamExt, wrappers::ReceiverStream};
17use tokio_util::sync::CancellationToken;
18use tonic::{Request, Response, Status};
19use tracing::{debug, error, info};
20
21use crate::api::proto::api::v1::control_message::Payload;
22use crate::api::proto::api::v1::controller_service_server::ControllerServiceServer;
23use crate::api::proto::api::v1::{
24 self, ConnectionDetails, ConnectionDirection, ConnectionListResponse, ConnectionType,
25 RouteListResponse,
26};
27use crate::api::proto::api::v1::{
28 ConnectionEntry, ControlMessage, RouteEntry,
29 controller_service_client::ControllerServiceClient,
30 controller_service_server::ControllerService as GrpcControllerService,
31};
32use crate::errors::ControllerError;
33use prost_types::Struct;
34use slim_config::client::{ClientConfig, TransportChannel};
35use slim_datapath::api::{
36 MessageType::Subscribe, MessageType::SubscriptionAck as SubscriptionAckType,
37 MessageType::Unsubscribe, ProtoMessage as DataPlaneMessage,
38};
39use slim_datapath::api::{NameId, ProtoName};
40use slim_datapath::message_processing::MessageProcessor;
41use slim_datapath::messages::utils::SlimHeaderFlags;
42use slim_datapath::tables::SubscriptionTable;
43
44type TxChannel = mpsc::Sender<Result<ControlMessage, Status>>;
45type TxChannels = HashMap<String, TxChannel>;
46
47const MAX_QUEUED_NOTIFICATIONS: usize = 1000;
49
50const SUBSCRIPTION_ACK_TIMEOUT: Duration = Duration::from_secs(30);
52
53#[derive(Clone)]
55pub struct ControlPlaneSettings {
56 pub id: ID,
58 pub group_name: Option<String>,
60 pub servers: Vec<ServerConfig>,
62 pub clients: Vec<ClientConfig>,
64 pub message_processor: Arc<MessageProcessor>,
66 pub connection_details: Vec<ConnectionDetails>,
69}
70
71struct ControllerServiceInternal {
76 id: ID,
78
79 group_name: Option<String>,
81
82 message_processor: Arc<MessageProcessor>,
84
85 tx_slim: mpsc::Sender<Result<DataPlaneMessage, Status>>,
87
88 tx_channels: parking_lot::RwLock<TxChannels>,
90
91 cancellation_tokens: parking_lot::RwLock<HashMap<String, CancellationToken>>,
93
94 drain_watch: parking_lot::RwLock<Option<drain::Watch>>,
96
97 pending_notifications: Arc<parking_lot::Mutex<VecDeque<ControlMessage>>>,
99
100 subscription_manager: SubscriptionManager,
102
103 connection_details: Vec<ConnectionDetails>,
105
106 route_subscription_ids: parking_lot::Mutex<HashMap<(ProtoName, u64), u64>>,
108
109 link_id_to_conn_id: parking_lot::RwLock<HashMap<String, u64>>,
112
113 stream_handles: parking_lot::Mutex<HashMap<String, tokio::task::JoinHandle<()>>>,
115}
116
117#[derive(Clone)]
118struct ControllerService {
119 inner: Arc<ControllerServiceInternal>,
121}
122
123pub struct ControlPlane {
125 servers: Vec<ServerConfig>,
127
128 clients: Vec<ClientConfig>,
130
131 drain_signal: parking_lot::RwLock<Option<drain::Signal>>,
133
134 controller: ControllerService,
136
137 rx_slim_option: Option<mpsc::Receiver<Result<DataPlaneMessage, Status>>>,
140}
141
142impl Drop for ControlPlane {
145 fn drop(&mut self) {
146 for (_endpoint, token) in self.controller.inner.cancellation_tokens.write().drain() {
148 token.cancel();
149 }
150 }
151}
152
153pub(crate) fn from_server_config(server_config: &ServerConfig) -> ConnectionDetails {
154 let mut endpoint = server_config.endpoint.clone();
155 let mut external_endpoint = None;
156 let mut spire_socket_path = None;
157 let mut trust_domain = None;
158 let mut remaining_fields = std::collections::BTreeMap::new();
159
160 if let Some(m) = &server_config.metadata {
161 for (k, v) in &m.inner {
162 match k.as_str() {
163 "local_endpoint" => {
164 if let Some(s) = v.as_str()
165 && !s.is_empty()
166 {
167 endpoint = s.to_string();
168 }
169 }
170 "external_endpoint" => {
171 if let Some(s) = v.as_str()
172 && !s.is_empty()
173 {
174 external_endpoint = Some(s.to_string());
175 }
176 }
177 "spire_socket_path" => {
178 if let Some(s) = v.as_str()
179 && !s.is_empty()
180 {
181 spire_socket_path = Some(s.to_string());
182 }
183 }
184 "trust_domain" => {
185 if let Some(s) = v.as_str()
186 && !s.is_empty()
187 {
188 trust_domain = Some(s.to_string());
189 }
190 }
191 _ => {
192 remaining_fields.insert(k.clone(), prost_types::Value::from(v));
193 }
194 }
195 }
196 }
197
198 let spire_mtls = if !server_config.tls_setting.insecure || spire_socket_path.is_some() {
199 Some(v1::connection_details::SpireMtls {
200 socket_path: spire_socket_path.unwrap_or_default(),
201 trust_domain,
202 })
203 } else {
204 None
205 };
206
207 let metadata = if remaining_fields.is_empty() {
208 None
209 } else {
210 Some(Struct {
211 fields: remaining_fields,
212 })
213 };
214
215 ConnectionDetails {
216 endpoint,
217 external_endpoint,
218 spire_mtls,
219 metadata,
220 }
221}
222
223impl ControlPlane {
225 pub fn new(config: ControlPlaneSettings) -> Self {
238 let (_, tx_slim, rx_slim) = config
240 .message_processor
241 .register_local_connection(true)
242 .unwrap();
243
244 let (signal, watch) = drain::channel();
245
246 ControlPlane {
247 servers: config.servers,
248 clients: config.clients,
249 controller: ControllerService {
250 inner: Arc::new(ControllerServiceInternal {
251 id: config.id,
252 group_name: config.group_name,
253 message_processor: config.message_processor,
254 subscription_manager: SubscriptionManager::new(tx_slim.clone()),
255 tx_slim,
256 tx_channels: parking_lot::RwLock::new(HashMap::new()),
257 cancellation_tokens: parking_lot::RwLock::new(HashMap::new()),
258 drain_watch: parking_lot::RwLock::new(Some(watch)),
259 pending_notifications: Arc::new(parking_lot::Mutex::new(VecDeque::new())),
260 connection_details: config.connection_details,
261 route_subscription_ids: parking_lot::Mutex::new(HashMap::new()),
262 link_id_to_conn_id: parking_lot::RwLock::new(HashMap::new()),
263 stream_handles: parking_lot::Mutex::new(HashMap::new()),
264 }),
265 },
266 drain_signal: parking_lot::RwLock::new(Some(signal)),
267 rx_slim_option: Some(rx_slim),
268 }
269 }
270
271 pub fn with_clients(mut self, clients: Vec<ClientConfig>) -> Self {
273 self.clients = clients;
274 self
275 }
276
277 pub fn with_servers(mut self, servers: Vec<ServerConfig>) -> Self {
279 self.servers = servers;
280 self
281 }
282
283 pub async fn run(&mut self) -> Result<(), ControllerError> {
290 let rx = self
291 .rx_slim_option
292 .take()
293 .ok_or(ControllerError::AlreadyStarted)?;
294
295 let servers = self.servers.clone();
297 let clients = self.clients.clone();
298
299 for server in servers {
301 self.run_server(server).await?;
302 }
303
304 for client in clients {
306 self.run_client(client).await?;
307 }
308
309 self.listen_from_data_plane(rx).await?;
310
311 Ok(())
312 }
313
314 pub async fn deregister(&self) -> Result<(), ControllerError> {
315 let node_id = self.controller.inner.id.to_string();
316 let deregister_msg = ControlMessage {
317 message_id: uuid::Uuid::new_v4().to_string(),
318 payload: Some(Payload::DeregisterNodeRequest(v1::DeregisterNodeRequest {
319 node: Some(v1::Node { id: node_id }),
320 })),
321 };
322 let channels: Vec<(String, TxChannel)> = self
323 .controller
324 .inner
325 .tx_channels
326 .read()
327 .iter()
328 .map(|(ep, tx)| (ep.clone(), tx.clone()))
329 .collect();
330 for (endpoint, tx) in channels {
331 if let Err(e) = tx.send(Ok(deregister_msg.clone())).await {
332 error!(%endpoint, error = %e, "failed to send deregister request");
333 }
334 }
335 Ok(())
336 }
337
338 pub async fn shutdown(&self) -> Result<(), ControllerError> {
339 let signal = self
341 .drain_signal
342 .write()
343 .take()
344 .ok_or(ControllerError::AlreadyStopped)?;
345
346 self.controller
348 .inner
349 .cancellation_tokens
350 .write()
351 .drain()
352 .for_each(|(endpoint, token)| {
353 info!(%endpoint, "stopping");
354 token.cancel();
355 });
356
357 self.controller.inner.drain_watch.write().take();
359
360 signal.drain().await;
362
363 Ok(())
364 }
365
366 async fn listen_from_data_plane(
367 &mut self,
368 mut rx: mpsc::Receiver<Result<DataPlaneMessage, Status>>,
369 ) -> Result<(), ControllerError> {
370 let cancellation_token = CancellationToken::new();
371 let cancellation_token_clone = cancellation_token.clone();
372
373 self.controller
374 .inner
375 .cancellation_tokens
376 .write()
377 .insert("DATA_PLANE".to_string(), cancellation_token_clone);
378
379 let clients = self.clients.clone();
380 let controller = self.controller.clone();
381
382 let watch = self.controller.drain_watch()?;
384
385 debug!("Starting data plane listener");
386 tokio::spawn(async move {
387 let mut drain_fut = std::pin::pin!(watch.signaled());
388 loop {
389 tokio::select! {
390 next = rx.recv() => {
391 match next {
392 Some(res) => {
393 match res {
394 Ok(msg) => {
395 debug!("Send sub/unsub to control plane for message: {:?}", msg);
396 match msg.get_type() {
397 Subscribe(_) => {
398 controller.handle_subscribe_message(msg.get_dst(), &clients).await;
399 }
400 Unsubscribe(_) => {
401 controller.handle_unsubscribe_message(msg.get_dst(), &clients).await;
402 }
403 SubscriptionAckType(_) => {
404 controller.inner.subscription_manager.resolve_ack(msg.get_subscription_ack());
405 }
406 _ => {
407 debug!("Ignoring unexpected message type from dataplane: {:?}", msg.get_type());
408 }
409 }
410 }
411 Err(e) => {
412 error!(error = %e.chain(), "received error from the data plane");
413 continue;
414 }
415 }
416 }
417 None => {
418 debug!("Data plane receiver channel closed.");
419 break;
420 }
421 }
422 }
423 _ = cancellation_token.cancelled() => {
424 debug!("shutting down stream on cancellation token");
425 break;
426 }
427 _ = &mut drain_fut => {
428 debug!("shutting down stream on drain");
429 break;
430 }
431 }
432 }
433 });
434 Ok(())
435 }
436
437 pub fn stop(&mut self) {
441 info!("stopping controller service");
442
443 for (endpoint, token) in self.controller.inner.cancellation_tokens.write().drain() {
445 info!(%endpoint, "stopping");
446 token.cancel();
447 }
448 }
449
450 async fn run_client(&mut self, client: ClientConfig) -> Result<(), ControllerError> {
454 if self
455 .controller
456 .inner
457 .cancellation_tokens
458 .read()
459 .contains_key(&client.endpoint)
460 {
461 return Err(ControllerError::ClientAlreadyRunning(client.endpoint));
462 }
463
464 let cancellation_token = CancellationToken::new();
465
466 let tx = self
467 .controller
468 .connect(client.clone(), cancellation_token.clone())
469 .await?;
470
471 self.controller
473 .inner
474 .cancellation_tokens
475 .write()
476 .insert(client.endpoint.clone(), cancellation_token);
477
478 self.controller
480 .inner
481 .tx_channels
482 .write()
483 .insert(client.endpoint.clone(), tx);
484
485 Ok(())
487 }
488
489 pub async fn run_server(&mut self, config: ServerConfig) -> Result<(), ControllerError> {
493 if self
495 .controller
496 .inner
497 .cancellation_tokens
498 .read()
499 .contains_key(&config.endpoint)
500 {
501 error!(endpoint = config.endpoint, "server is already running",);
502 return Err(ControllerError::ServerAlreadyRunning(config.endpoint));
503 }
504
505 let token = config
506 .run_grpc_server(
507 &[ControllerServiceServer::new(self.controller.clone())],
508 self.controller.drain_watch()?,
509 )
510 .await?;
511
512 self.controller
514 .inner
515 .cancellation_tokens
516 .write()
517 .insert(config.endpoint.clone(), token.clone());
518
519 info!(%config.endpoint, "started controlplane server");
520
521 Ok(())
522 }
523}
524
525impl ControllerService {
526 fn resolve_connection_by_link_id(&self, link_id: &str) -> Result<Option<u64>, String> {
527 let cached = self.inner.link_id_to_conn_id.read().get(link_id).copied();
528 if let Some(conn_id) = cached {
529 if self
530 .inner
531 .message_processor
532 .connection_table()
533 .get(conn_id)
534 .is_some_and(|conn| conn.link_id().as_deref() == Some(link_id))
535 {
536 return Ok(Some(conn_id));
537 }
538 self.inner.link_id_to_conn_id.write().remove(link_id);
539 }
540
541 let mut resolved: Option<u64> = None;
542 self.inner
543 .message_processor
544 .connection_table()
545 .for_each(|id, conn| {
546 if conn.link_id().as_deref() == Some(link_id) && resolved.is_none() {
547 resolved = Some(id);
548 }
549 });
550
551 if let Some(conn_id) = resolved {
552 self.inner
553 .link_id_to_conn_id
554 .write()
555 .insert(link_id.to_string(), conn_id);
556 }
557
558 Ok(resolved)
559 }
560
561 fn disconnect_connection_by_link_id(&self, link_id: &str) -> Result<(), String> {
562 if link_id.trim().is_empty() {
563 return Err("link_id cannot be empty".to_string());
564 }
565
566 let conn_id = match self.resolve_connection_by_link_id(link_id)? {
567 Some(id) => id,
568 None => {
569 return Err(format!("Connection with link_id {} not found", link_id));
570 }
571 };
572
573 if let Err(e) = self.inner.message_processor.disconnect(conn_id) {
574 info!(
576 link_id = %link_id,
577 conn_id,
578 error = %e,
579 "Disconnect returned an error; continuing delete flow"
580 );
581 }
582
583 self.inner.link_id_to_conn_id.write().remove(link_id);
584 self.inner
585 .route_subscription_ids
586 .lock()
587 .retain(|(_name, cid), _| *cid != conn_id);
588
589 info!(link_id = %link_id, conn_id, "Successfully deleted connection by link_id");
590 Ok(())
591 }
592
593 fn resolve_route_connection(&self, route: &v1::Route) -> Result<Option<u64>, String> {
594 if let Some(link_id) = &route.link_id {
595 let trimmed = link_id.trim();
596 if !trimmed.is_empty() {
597 return self.resolve_connection_by_link_id(trimmed);
598 }
599 }
600
601 Ok(None)
602 }
603
604 async fn diff_connections(
605 &self,
606 desired_connections: &[v1::Connection],
607 ) -> Vec<v1::ConnectionAck> {
608 let mut connections_status = Vec::new();
609
610 let desired_link_ids: std::collections::HashSet<String> = desired_connections
611 .iter()
612 .map(|c| c.link_id.clone())
613 .collect();
614
615 let mut live_outgoing_link_ids: Vec<String> = Vec::new();
616 self.inner
617 .message_processor
618 .connection_table()
619 .for_each(|_id, conn| {
620 if conn.is_outgoing()
621 && let Some(lid) = conn.link_id()
622 && !lid.is_empty()
623 {
624 live_outgoing_link_ids.push(lid);
625 }
626 });
627
628 for link_id in &live_outgoing_link_ids {
629 if desired_link_ids.contains(link_id) {
630 continue;
631 }
632 info!(link_id = %link_id, "desired state: removing connection");
633 let mut success = true;
634 let mut error_msg = String::new();
635 if let Err(err) = self.disconnect_connection_by_link_id(link_id) {
636 success = false;
637 error_msg = err;
638 }
639 connections_status.push(v1::ConnectionAck {
640 link_id: link_id.clone(),
641 success,
642 error_msg,
643 });
644 }
645
646 for conn in desired_connections {
647 let link_id = &conn.link_id;
648 if link_id.is_empty() {
649 continue;
650 }
651
652 let already_exists = match self.resolve_connection_by_link_id(link_id) {
653 Ok(Some(_)) => true,
654 Ok(None) => false,
655 Err(err) => {
656 connections_status.push(v1::ConnectionAck {
657 link_id: link_id.clone(),
658 success: false,
659 error_msg: err,
660 });
661 continue;
662 }
663 };
664
665 if already_exists {
666 connections_status.push(v1::ConnectionAck {
667 link_id: link_id.clone(),
668 success: true,
669 error_msg: String::new(),
670 });
671 continue;
672 }
673
674 info!(?conn, "desired state: creating connection");
675 let mut success = true;
676 let mut error_msg = String::new();
677
678 match serde_json::from_str::<ClientConfig>(&conn.config_data) {
679 Err(e) => {
680 success = false;
681 error_msg = format!("Failed to parse config: {}", e);
682 }
683 Ok(client_config) => {
684 match self
685 .inner
686 .message_processor
687 .connect(client_config.clone(), None, None)
688 .await
689 {
690 Err(e) => {
691 success = false;
692 error_msg = format!("Connection failed: {}", e);
693 }
694 Ok(conn_id) => {
695 let requested_link_id = if client_config.link_id.trim().is_empty() {
696 String::new()
697 } else {
698 client_config.link_id.clone()
699 };
700 if !requested_link_id.is_empty() {
701 self.inner
702 .link_id_to_conn_id
703 .write()
704 .insert(requested_link_id, conn_id.1);
705 }
706 info!(
707 link_id = %link_id,
708 "Successfully created connection"
709 );
710 }
711 }
712 }
713 }
714
715 connections_status.push(v1::ConnectionAck {
716 link_id: link_id.clone(),
717 success,
718 error_msg,
719 });
720 }
721
722 connections_status
723 }
724
725 fn resolve_desired_routes<'a>(
726 &self,
727 desired_routes: &'a [v1::Route],
728 ) -> (HashMap<(ProtoName, u64), &'a v1::Route>, Vec<v1::RouteAck>) {
729 type SubKey = (ProtoName, u64);
730 let mut desired_subs: HashMap<SubKey, &v1::Route> = HashMap::new();
731 let mut failures: Vec<v1::RouteAck> = Vec::new();
732
733 for sub in desired_routes {
734 match self.resolve_route_connection(sub) {
735 Ok(Some(conn_id)) => {
736 let name = sub.name.clone().unwrap();
737 desired_subs.insert((name, conn_id), sub);
738 }
739 Ok(None) => {
740 failures.push(v1::RouteAck {
741 route: Some(sub.clone()),
742 success: false,
743 error_msg: "connection not found".to_string(),
744 });
745 }
746 Err(err) => {
747 failures.push(v1::RouteAck {
748 route: Some(sub.clone()),
749 success: false,
750 error_msg: err,
751 });
752 }
753 }
754 }
755
756 (desired_subs, failures)
757 }
758
759 async fn delete_stale_subscriptions(
760 &self,
761 desired_subs: &HashMap<(ProtoName, u64), &v1::Route>,
762 ) -> Vec<v1::RouteAck> {
763 let active_subs: Vec<((ProtoName, u64), u64)> = self
764 .inner
765 .route_subscription_ids
766 .lock()
767 .iter()
768 .map(|((name, conn_id), sub_id)| ((name.clone(), *conn_id), *sub_id))
769 .collect();
770
771 let stale: Vec<_> = active_subs
772 .into_iter()
773 .filter(|((name, conn_id), _)| !desired_subs.contains_key(&(name.clone(), *conn_id)))
774 .collect();
775
776 let futs = stale.iter().map(|((name, conn_id), sub_id)| {
777 let name = name.clone();
778 let conn_id = *conn_id;
779 let sub_id = *sub_id;
780 async move {
781 let conn_alive = self
782 .inner
783 .message_processor
784 .connection_table()
785 .get(conn_id)
786 .is_some();
787
788 let (success, error_msg) = if conn_alive {
789 let source = name.clone().with_id(0);
790 let unsub_msg = DataPlaneMessage::builder()
791 .source(source)
792 .destination(name.clone())
793 .identity("")
794 .flags(SlimHeaderFlags::default().with_recv_from(conn_id))
795 .build_unsubscribe()
796 .unwrap();
797
798 match self
799 .send_unsubscribe_message_with_ack(unsub_msg, sub_id)
800 .await
801 {
802 Ok(()) => (true, String::new()),
803 Err(err) => (false, format!("Failed to unsubscribe: {}", err)),
804 }
805 } else {
806 (true, String::new())
807 };
808
809 (name, conn_id, success, error_msg)
810 }
811 });
812
813 let results = futures::future::join_all(futs).await;
814
815 let mut routes_status = Vec::with_capacity(results.len());
816 for (name, conn_id, success, error_msg) in results {
817 if success {
818 self.inner
819 .route_subscription_ids
820 .lock()
821 .remove(&(name.clone(), conn_id));
822 }
823
824 routes_status.push(v1::RouteAck {
825 route: Some(v1::Route {
826 name: Some(name.clone()),
827 link_id: None,
828 direction: None,
829 }),
830 success,
831 error_msg,
832 });
833 }
834
835 routes_status
836 }
837
838 async fn create_new_subscriptions(
839 &self,
840 desired_subs: &HashMap<(ProtoName, u64), &v1::Route>,
841 ) -> Vec<v1::RouteAck> {
842 let mut routes_status = Vec::new();
843
844 let to_create: Vec<((ProtoName, u64), v1::Route)> = desired_subs
845 .iter()
846 .filter(|((name, conn_id), _)| {
847 !self
848 .inner
849 .route_subscription_ids
850 .lock()
851 .contains_key(&(name.clone(), *conn_id))
852 })
853 .map(|((name, conn_id), sub)| ((name.clone(), *conn_id), (*sub).clone()))
854 .collect();
855
856 for ((name, conn_id), sub) in desired_subs {
858 let dominated = to_create
859 .iter()
860 .any(|((n, c), _)| n == name && *c == *conn_id);
861 if !dominated {
862 routes_status.push(v1::RouteAck {
863 route: Some((*sub).clone()),
864 success: true,
865 error_msg: String::new(),
866 });
867 }
868 }
869
870 let futs = to_create.iter().map(|((name, conn_id), sub)| {
871 let name = name.clone();
872 let conn_id = *conn_id;
873 let sub = sub.clone();
874 async move {
875 let source = name.clone().with_id(0);
876 let sub_msg = DataPlaneMessage::builder()
877 .source(source)
878 .destination(name.clone())
879 .identity("")
880 .flags(SlimHeaderFlags::default().with_recv_from(conn_id))
881 .build_subscribe()
882 .unwrap();
883
884 let result = self.send_subscribe_message_with_ack(sub_msg).await;
885 (name, conn_id, sub, result)
886 }
887 });
888
889 let results = futures::future::join_all(futs).await;
890
891 for (name, conn_id, sub, result) in results {
892 let (success, error_msg) = match result {
893 Ok(subscription_id) => {
894 self.inner
895 .route_subscription_ids
896 .lock()
897 .insert((name, conn_id), subscription_id);
898 info!(?sub, "desired state: created route");
899 (true, String::new())
900 }
901 Err(err) => (false, format!("Failed to subscribe: {}", err)),
902 };
903
904 routes_status.push(v1::RouteAck {
905 route: Some(sub),
906 success,
907 error_msg,
908 });
909 }
910
911 routes_status
912 }
913
914 async fn handle_new_control_message(
916 &self,
917 msg: ControlMessage,
918 tx: &mpsc::Sender<Result<ControlMessage, Status>>,
919 ) -> Result<(), ControllerError> {
920 match msg.payload {
921 Some(ref payload) => {
922 match payload {
923 Payload::ConfigCommand(config) => {
924 let (connections_status, routes_status) = if config.reconcile {
925 let connections_status =
926 self.diff_connections(&config.connections_to_create).await;
927 let (desired_subs, mut routes_status) =
928 self.resolve_desired_routes(&config.routes_to_set);
929 routes_status
930 .extend(self.delete_stale_subscriptions(&desired_subs).await);
931 routes_status
932 .extend(self.create_new_subscriptions(&desired_subs).await);
933 (connections_status, routes_status)
934 } else {
935 let mut connections_status = Vec::new();
936 let mut routes_status = Vec::new();
937
938 for link_id in &config.connections_to_delete {
940 info!(link_id = %link_id, "received a connection to delete");
941 let mut connection_success = true;
942 let mut connection_error_msg = String::new();
943
944 if let Err(err) = self.disconnect_connection_by_link_id(link_id) {
945 connection_success = false;
946 connection_error_msg = err;
947 }
948
949 connections_status.push(v1::ConnectionAck {
950 link_id: link_id.clone(),
951 success: connection_success,
952 error_msg: connection_error_msg,
953 });
954 }
955
956 for conn in &config.connections_to_create {
958 info!(?conn, "received a connection to create");
959 let mut connection_success = true;
960 let mut connection_error_msg = String::new();
961
962 match serde_json::from_str::<ClientConfig>(&conn.config_data) {
963 Err(e) => {
964 connection_success = false;
965 connection_error_msg =
966 format!("Failed to parse config: {}", e);
967 }
968 Ok(client_config) => {
969 let client_endpoint = &client_config.endpoint;
970 let requested_link_id =
971 if client_config.link_id.trim().is_empty() {
972 String::new()
973 } else {
974 client_config.link_id.clone()
975 };
976 let mut existing_conn_for_link_id = false;
977
978 if !requested_link_id.is_empty() {
979 match self
980 .resolve_connection_by_link_id(&requested_link_id)
981 {
982 Err(err) => {
983 connection_success = false;
984 connection_error_msg = err;
985 }
986 Ok(Some(conn_id)) => {
987 existing_conn_for_link_id = true;
988 self.inner
989 .link_id_to_conn_id
990 .write()
991 .insert(requested_link_id.clone(), conn_id);
992 info!(
993 link_id = %requested_link_id,
994 conn_id,
995 "Connection already exists for link_id"
996 );
997 }
998 Ok(None) => {}
999 }
1000 }
1001
1002 if connection_success && !existing_conn_for_link_id {
1003 match self
1004 .inner
1005 .message_processor
1006 .connect(client_config.clone(), None, None)
1007 .await
1008 {
1009 Err(e) => {
1010 connection_success = false;
1011 connection_error_msg =
1012 format!("Connection failed: {}", e);
1013 }
1014 Ok(conn_id) => {
1015 if !requested_link_id.is_empty() {
1016 self.inner
1017 .link_id_to_conn_id
1018 .write()
1019 .insert(
1020 requested_link_id.clone(),
1021 conn_id.1,
1022 );
1023 }
1024 info!(
1025 endpoint = %client_endpoint, "Successfully created connection",
1026 );
1027 }
1028 }
1029 }
1030 }
1031 }
1032
1033 connections_status.push(v1::ConnectionAck {
1035 link_id: conn.link_id.clone(),
1036 success: connection_success,
1037 error_msg: connection_error_msg,
1038 });
1039 }
1040
1041 for route in &config.routes_to_set {
1043 let mut route_success = true;
1044 let mut route_error_msg = String::new();
1045
1046 let conn = self.resolve_route_connection(route);
1047
1048 if let Ok(Some(conn)) = conn {
1049 let name = route.name.clone().unwrap();
1050 let source = name.clone().with_id(NameId::NULL_COMPONENT);
1051
1052 let msg = DataPlaneMessage::builder()
1053 .source(source.clone())
1054 .destination(name.clone())
1055 .identity("")
1056 .flags(SlimHeaderFlags::default().with_recv_from(conn))
1057 .build_subscribe()
1058 .unwrap();
1059
1060 match self.send_subscribe_message_with_ack(msg).await {
1061 Ok(subscription_id) => {
1062 self.inner
1064 .route_subscription_ids
1065 .lock()
1066 .insert((name.clone(), conn), subscription_id);
1067 info!(?route, "Successfully created route");
1068 }
1069 Err(err) => {
1070 route_success = false;
1071 route_error_msg =
1072 format!("Failed to subscribe: {}", err);
1073 }
1074 }
1075 } else {
1076 route_success = false;
1077 route_error_msg = match conn {
1078 Ok(None) => {
1079 format!(
1080 "Connection with link_id {} not found",
1081 route.link_id.as_deref().unwrap_or("<none>")
1082 )
1083 }
1084 Err(err) => err,
1085 _ => "unknown connection lookup error".to_string(),
1086 };
1087 }
1088
1089 routes_status.push(v1::RouteAck {
1091 route: Some(route.clone()),
1092 success: route_success,
1093 error_msg: route_error_msg,
1094 });
1095 }
1096
1097 for route in &config.routes_to_delete {
1099 let mut route_success = true;
1100 let mut route_error_msg = String::new();
1101
1102 let conn = self.resolve_route_connection(route);
1103
1104 if let Ok(Some(conn)) = conn {
1105 let name = route.name.clone().unwrap();
1106 let source = name.clone().with_id(NameId::NULL_COMPONENT);
1107
1108 let msg = DataPlaneMessage::builder()
1109 .source(source.clone())
1110 .destination(name.clone())
1111 .identity("")
1112 .flags(SlimHeaderFlags::default().with_recv_from(conn))
1113 .build_unsubscribe()
1114 .unwrap();
1115
1116 let sub_id = self
1117 .inner
1118 .route_subscription_ids
1119 .lock()
1120 .remove(&(name.clone(), conn));
1121 let unsubscribe_result = match sub_id {
1122 Some(subscription_id) => {
1123 self.send_unsubscribe_message_with_ack(
1124 msg,
1125 subscription_id,
1126 )
1127 .await
1128 }
1129 None => {
1130 let (ack_id, ack_rx) =
1136 self.inner.subscription_manager.register_ack();
1137 let mut fresh_msg = msg;
1138 fresh_msg.set_subscription_id(ack_id);
1139 if let Err(e) =
1140 self.send_control_message(fresh_msg).await
1141 {
1142 self.inner.subscription_manager.cancel_ack(ack_id);
1143 Err(format!("datapath send error: {}", e.chain()))
1144 } else {
1145 match tokio::time::timeout(
1146 SUBSCRIPTION_ACK_TIMEOUT,
1147 ack_rx,
1148 )
1149 .await
1150 {
1151 Ok(Ok(Ok(()))) => Ok(()),
1152 Ok(Ok(Err(err))) => Err(err.to_string()),
1153 Ok(Err(_)) => {
1154 Err("subscription ack channel closed"
1155 .to_string())
1156 }
1157 Err(_) => {
1158 self.inner
1159 .subscription_manager
1160 .cancel_ack(ack_id);
1161 Err("subscription ack timed out"
1162 .to_string())
1163 }
1164 }
1165 }
1166 }
1167 };
1168 if let Err(err) = unsubscribe_result {
1169 route_success = false;
1170 route_error_msg = format!("Failed to unsubscribe: {}", err);
1171 } else {
1172 info!(?route, "Successfully deleted route");
1173 }
1174 } else {
1175 route_success = false;
1176 route_error_msg = match conn {
1177 Ok(None) => {
1178 format!(
1179 "Connection with link_id {} not found",
1180 route.link_id.as_deref().unwrap_or("<none>")
1181 )
1182 }
1183 Err(err) => err,
1184 _ => "unknown connection lookup error".to_string(),
1185 };
1186 }
1187
1188 routes_status.push(v1::RouteAck {
1190 route: Some(route.clone()),
1191 success: route_success,
1192 error_msg: route_error_msg,
1193 });
1194 }
1195
1196 (connections_status, routes_status)
1197 };
1198
1199 let config_ack = v1::ConfigurationCommandAck {
1200 original_message_id: msg.message_id.clone(),
1201 connections_status,
1202 routes_status,
1203 };
1204
1205 let reply = ControlMessage {
1206 message_id: uuid::Uuid::new_v4().to_string(),
1207 payload: Some(Payload::ConfigCommandAck(config_ack)),
1208 };
1209
1210 if let Err(e) = tx.send(Ok(reply)).await {
1211 error!(error = %e.chain(), "failed to send ConfigurationCommandAck");
1212 }
1213
1214 info!(
1215 connections = %config.connections_to_create.len(),
1216 connections_to_delete = %config.connections_to_delete.len(),
1217 routes_to_set = %config.routes_to_set.len(),
1218 routes_to_del = %config.routes_to_delete.len(),
1219 "Processed ConfigurationCommand"
1220 );
1221 }
1222 Payload::RouteListRequest(_) => {
1223 const CHUNK_SIZE: usize = 100;
1224
1225 let conn_table = self.inner.message_processor.connection_table();
1226 let mut entries = Vec::new();
1227
1228 self.inner.message_processor.subscription_table().for_each(
1229 |name, id, local, remote, _peer| {
1230 let mut entry = RouteEntry {
1231 name: Some(name.clone().with_id(id)),
1232 ..Default::default()
1233 };
1234
1235 for &cid in local {
1236 entry.connections.push(ConnectionEntry {
1237 id: cid,
1238 connection_type: ConnectionType::Local as i32,
1239 config_data: "{}".to_string(),
1240 link_id: None,
1241 direction: ConnectionDirection::Outgoing as i32,
1242 });
1243 }
1244
1245 for &cid in remote {
1246 if let Some(conn) = conn_table.get(cid) {
1247 entry.connections.push(ConnectionEntry {
1248 id: cid,
1249 connection_type: ConnectionType::Remote as i32,
1250 config_data: match conn.config_data() {
1251 Some(data) => serde_json::to_string(data)
1252 .unwrap_or_else(|_| "{}".to_string()),
1253 None => "{}".to_string(),
1254 },
1255 link_id: conn.link_id(),
1256 direction: if conn.is_outgoing() {
1257 ConnectionDirection::Outgoing as i32
1258 } else {
1259 ConnectionDirection::Incoming as i32
1260 },
1261 });
1262 } else {
1263 error!(%cid, "no connection entry for id");
1264 }
1265 }
1266 entries.push(entry);
1267 },
1268 );
1269
1270 let chunks: Vec<_> = entries.chunks(CHUNK_SIZE).collect();
1271 if chunks.is_empty() {
1272 let resp = ControlMessage {
1273 message_id: uuid::Uuid::new_v4().to_string(),
1274 payload: Some(Payload::RouteListResponse(RouteListResponse {
1275 original_message_id: msg.message_id.clone(),
1276 entries: vec![],
1277 done: true,
1278 })),
1279 };
1280 if let Err(e) = tx.send(Ok(resp)).await {
1281 error!(error = %e.chain(), "failed to send route list response");
1282 }
1283 } else {
1284 let n = chunks.len();
1285 for (i, chunk) in chunks.into_iter().enumerate() {
1286 let resp = ControlMessage {
1287 message_id: uuid::Uuid::new_v4().to_string(),
1288 payload: Some(Payload::RouteListResponse(RouteListResponse {
1289 original_message_id: msg.message_id.clone(),
1290 entries: chunk.to_vec(),
1291 done: i + 1 == n,
1292 })),
1293 };
1294 if let Err(e) = tx.send(Ok(resp)).await {
1295 error!(error = %e.chain(), "failed to send route batch");
1296 break;
1297 }
1298 }
1299 }
1300 }
1301 Payload::ConnectionListRequest(_) => {
1302 let mut all_entries = Vec::new();
1303 self.inner
1304 .message_processor
1305 .connection_table()
1306 .for_each(|id, conn| {
1307 debug!(
1308 conn_id = id,
1309 local_addr = ?conn.local_addr(),
1310 remote_addr = ?conn.remote_addr(),
1311 is_outgoing = conn.is_outgoing(),
1312 link_id = ?conn.link_id(),
1313 "connection entry",
1314 );
1315 all_entries.push(ConnectionEntry {
1316 id,
1317 connection_type: ConnectionType::Remote as i32,
1318 config_data: match conn.config_data() {
1319 Some(data) => serde_json::to_string(data)
1320 .unwrap_or_else(|_| "{}".to_string()),
1321 None => "{}".to_string(),
1322 },
1323 link_id: conn.link_id(),
1324 direction: if conn.is_outgoing() {
1325 ConnectionDirection::Outgoing as i32
1326 } else {
1327 ConnectionDirection::Incoming as i32
1328 },
1329 });
1330 });
1331
1332 const CHUNK_SIZE: usize = 100;
1333 let chunks: Vec<_> = all_entries.chunks(CHUNK_SIZE).collect();
1334 if chunks.is_empty() {
1335 let resp = ControlMessage {
1336 message_id: uuid::Uuid::new_v4().to_string(),
1337 payload: Some(Payload::ConnectionListResponse(
1338 ConnectionListResponse {
1339 original_message_id: msg.message_id.clone(),
1340 entries: vec![],
1341 done: true,
1342 },
1343 )),
1344 };
1345 if let Err(e) = tx.send(Ok(resp)).await {
1346 error!(error = %e.chain(), "failed to send connection list response");
1347 }
1348 } else {
1349 let n = chunks.len();
1350 for (i, chunk) in chunks.into_iter().enumerate() {
1351 let resp = ControlMessage {
1352 message_id: uuid::Uuid::new_v4().to_string(),
1353 payload: Some(Payload::ConnectionListResponse(
1354 ConnectionListResponse {
1355 original_message_id: msg.message_id.clone(),
1356 entries: chunk.to_vec(),
1357 done: i + 1 == n,
1358 },
1359 )),
1360 };
1361 if let Err(e) = tx.send(Ok(resp)).await {
1362 error!(error = %e.chain(), "failed to send connection list batch");
1363 break;
1364 }
1365 }
1366 }
1367 }
1368 Payload::RegisterNodeRequest(_) => {
1369 error!("received a register node request");
1370 }
1371 Payload::DeregisterNodeRequest(_) => {
1372 error!("received a deregister node request");
1373 }
1374 _ => {
1375 debug!("received unsupported message type from control - ignoring");
1377 }
1378 }
1379 }
1380 None => {
1381 error!(
1382 message_id = %msg.message_id,
1383 "received control message with no payload",
1384 );
1385 }
1386 }
1387
1388 Ok(())
1389 }
1390
1391 async fn handle_subscribe_message(&self, dst: ProtoName, clients: &[ClientConfig]) {
1392 let mut sub_vec = vec![];
1393
1394 let cmd = v1::Route {
1395 name: Some(dst),
1396 link_id: None,
1397 direction: None,
1398 };
1399
1400 sub_vec.push(cmd);
1401
1402 let ctrl = ControlMessage {
1403 message_id: uuid::Uuid::new_v4().to_string(),
1404 payload: Some(Payload::ConfigCommand(v1::ConfigurationCommand {
1405 connections_to_create: vec![],
1406 connections_to_delete: vec![],
1407 routes_to_set: sub_vec,
1408 routes_to_delete: vec![],
1409 reconcile: false,
1410 })),
1411 };
1412
1413 return self.send_or_queue_notification(ctrl, clients).await;
1414 }
1415
1416 async fn handle_unsubscribe_message(&self, dst: ProtoName, clients: &[ClientConfig]) {
1417 let mut unsub_vec = vec![];
1418
1419 let cmd = v1::Route {
1420 name: Some(dst),
1421 link_id: None,
1422 direction: None,
1423 };
1424
1425 unsub_vec.push(cmd);
1426
1427 let ctrl = ControlMessage {
1428 message_id: uuid::Uuid::new_v4().to_string(),
1429 payload: Some(Payload::ConfigCommand(v1::ConfigurationCommand {
1430 connections_to_create: vec![],
1431 connections_to_delete: vec![],
1432 routes_to_set: vec![],
1433 routes_to_delete: unsub_vec,
1434 reconcile: false,
1435 })),
1436 };
1437
1438 return self.send_or_queue_notification(ctrl, clients).await;
1439 }
1440
1441 async fn send_subscribe_message_with_ack(
1443 &self,
1444 mut msg: DataPlaneMessage,
1445 ) -> Result<u64, String> {
1446 let (ack_id, ack_rx) = self.inner.subscription_manager.register_ack();
1447 msg.set_subscription_id(ack_id);
1448
1449 if let Err(e) = self.send_control_message(msg).await {
1450 self.inner.subscription_manager.cancel_ack(ack_id);
1451 return Err(format!("datapath send error: {}", e.chain()));
1452 }
1453
1454 match tokio::time::timeout(SUBSCRIPTION_ACK_TIMEOUT, ack_rx).await {
1455 Ok(Ok(Ok(()))) => Ok(ack_id),
1456 Ok(Ok(Err(err))) => Err(err.to_string()),
1457 Ok(Err(_)) => Err("subscription ack channel closed".to_string()),
1458 Err(_) => {
1459 self.inner.subscription_manager.cancel_ack(ack_id);
1460 Err("subscription ack timed out".to_string())
1461 }
1462 }
1463 }
1464
1465 async fn send_unsubscribe_message_with_ack(
1467 &self,
1468 mut msg: DataPlaneMessage,
1469 subscription_id: u64,
1470 ) -> Result<(), String> {
1471 let ack_rx = self
1472 .inner
1473 .subscription_manager
1474 .register_ack_with_id(subscription_id);
1475 msg.set_subscription_id(subscription_id);
1476
1477 if let Err(e) = self.send_control_message(msg).await {
1478 self.inner.subscription_manager.cancel_ack(subscription_id);
1479 return Err(format!("datapath send error: {}", e.chain()));
1480 }
1481
1482 match tokio::time::timeout(SUBSCRIPTION_ACK_TIMEOUT, ack_rx).await {
1483 Ok(Ok(Ok(()))) => Ok(()),
1484 Ok(Ok(Err(err))) => Err(err.to_string()),
1485 Ok(Err(_)) => Err("subscription ack channel closed".to_string()),
1486 Err(_) => {
1487 self.inner.subscription_manager.cancel_ack(subscription_id);
1488 Err("subscription ack timed out".to_string())
1489 }
1490 }
1491 }
1492
1493 async fn send_control_message(&self, msg: DataPlaneMessage) -> Result<(), ControllerError> {
1495 self.inner.tx_slim.send(Ok(msg)).await.map_err(|e| {
1496 error!(error = %e.chain(), "error sending message into datapath");
1497 ControllerError::Datapath(slim_datapath::errors::DataPathError::ConnectionError)
1498 })
1499 }
1500
1501 async fn send_or_queue_notification(&self, ctrl_msg: ControlMessage, clients: &[ClientConfig]) {
1508 let mut sent = false;
1509
1510 for c in clients {
1511 let tx = match self.inner.tx_channels.read().get(&c.endpoint) {
1512 Some(tx) => tx.clone(),
1513 None => continue,
1514 };
1515
1516 match tx.try_send(Ok(ctrl_msg.clone())) {
1517 Ok(()) => {
1518 sent = true;
1519 }
1520 Err(mpsc::error::TrySendError::Full(_)) => {
1521 debug!(
1522 endpoint = %c.endpoint,
1523 "channel full, queuing notification instead of blocking"
1524 );
1525 }
1526 Err(mpsc::error::TrySendError::Closed(_)) => {
1527 debug!(
1528 endpoint = %c.endpoint,
1529 "channel closed, queuing notification"
1530 );
1531 }
1532 }
1533 }
1534
1535 if !sent {
1536 let mut queue = self.inner.pending_notifications.lock();
1537 if queue.len() >= MAX_QUEUED_NOTIFICATIONS {
1538 queue.pop_front();
1539 debug!("queue full, removed oldest notification");
1540 }
1541 queue.push_back(ctrl_msg);
1542 }
1543 }
1544
1545 fn drain_watch(&self) -> Result<drain::Watch, ControllerError> {
1547 self.inner
1548 .drain_watch
1549 .read()
1550 .clone()
1551 .ok_or(ControllerError::AlreadyStopped)
1552 }
1553
1554 async fn send_queued_notifications(
1556 &self,
1557 tx: &mpsc::Sender<Result<ControlMessage, Status>>,
1558 endpoint: &str,
1559 ) {
1560 let notifications = {
1561 let mut queue = self.inner.pending_notifications.lock();
1562 if queue.is_empty() {
1563 return;
1564 }
1565 queue.drain(..).collect::<Vec<_>>()
1566 };
1567
1568 if notifications.is_empty() {
1569 return;
1570 }
1571
1572 debug!(
1573 "sending {} queued subscription notifications to {}",
1574 notifications.len(),
1575 endpoint
1576 );
1577
1578 let mut failed_notifications = Vec::new();
1579 for notification in notifications {
1580 if let Err(e) = tx.send(Ok(notification)).await {
1581 error!(
1582 error = %e.chain(),
1583 %endpoint,
1584 "failed to send queued notification to control plane",
1585 );
1586
1587 failed_notifications.push(e.0.unwrap());
1589 }
1590 }
1591
1592 if !failed_notifications.is_empty() {
1594 self.inner
1595 .pending_notifications
1596 .lock()
1597 .extend(failed_notifications);
1598 }
1599 }
1600
1601 fn process_control_message_stream(
1603 &self,
1604 config: Option<ClientConfig>,
1605 mut stream: impl Stream<Item = Result<ControlMessage, Status>> + Unpin + Send + 'static,
1606 tx: mpsc::Sender<Result<ControlMessage, Status>>,
1607 cancellation_token: CancellationToken,
1608 ) -> Result<JoinHandle<()>, ControllerError> {
1609 let this = self.clone();
1610 let watch = self.drain_watch()?;
1611
1612 let handle = tokio::spawn(async move {
1613 let endpoint = config
1615 .as_ref()
1616 .map(|c| c.endpoint.clone())
1617 .unwrap_or_else(|| "unknown".to_string());
1618 info!(%endpoint, "connected to control plane");
1619
1620 let mut retry_connect = false;
1621
1622 let mut active_connections = Vec::new();
1623 this.inner
1624 .message_processor
1625 .connection_table()
1626 .for_each(|id, conn| {
1627 active_connections.push(v1::ConnectionEntry {
1628 id,
1629 connection_type: v1::ConnectionType::Remote as i32,
1630 config_data: match conn.config_data() {
1631 Some(data) => {
1632 serde_json::to_string(data).unwrap_or_else(|_| "{}".to_string())
1633 }
1634 None => "{}".to_string(),
1635 },
1636 link_id: conn.link_id(),
1637 direction: if conn.is_outgoing() {
1638 v1::ConnectionDirection::Outgoing as i32
1639 } else {
1640 v1::ConnectionDirection::Incoming as i32
1641 },
1642 });
1643 });
1644
1645 let active_routes = {
1646 let conn_id_to_link_id: HashMap<u64, String> = this
1647 .inner
1648 .link_id_to_conn_id
1649 .read()
1650 .iter()
1651 .map(|(lid, cid)| (*cid, lid.clone()))
1652 .collect();
1653 this.inner
1654 .route_subscription_ids
1655 .lock()
1656 .iter()
1657 .map(|((name, conn_id), _sub_id)| v1::Route {
1658 name: Some(name.clone()),
1659 link_id: conn_id_to_link_id.get(conn_id).cloned(),
1660 direction: None,
1661 })
1662 .collect::<Vec<_>>()
1663 };
1664
1665 let register_request = ControlMessage {
1666 message_id: uuid::Uuid::new_v4().to_string(),
1667 payload: Some(Payload::RegisterNodeRequest(v1::RegisterNodeRequest {
1668 node_id: this.inner.id.to_string(),
1669 group_name: this.inner.group_name.clone(),
1670 connection_details: this.inner.connection_details.clone(),
1671 connections: active_connections,
1672 routes: active_routes,
1673 })),
1674 };
1675
1676 if config.is_some()
1678 && let Err(e) = tx.send(Ok(register_request)).await
1679 {
1680 error!(error = %e.chain(), "failed to send register request");
1681 return;
1682 }
1683
1684 this.send_queued_notifications(&tx, &endpoint).await;
1687
1688 if config.is_some() {
1690 let registration_result = tokio::time::timeout(
1691 Duration::from_secs(10),
1692 async {
1693 loop {
1694 tokio::select! {
1695 next = stream.next() => {
1696 match next {
1697 Some(Ok(msg)) => {
1698 if let Some(Payload::RegisterNodeResponse(resp)) = msg.payload {
1699 if resp.success {
1700 info!(
1701 connections = resp.connections.len(),
1702 routes = resp.routes.len(),
1703 "registration acknowledged by control plane"
1704 );
1705 return Ok(resp);
1706 } else {
1707 return Err("registration rejected by control plane".to_string());
1708 }
1709 }
1710 }
1711 Some(Err(_)) => return Err("stream error waiting for registration response".to_string()),
1712 None => return Err("stream closed before registration response".to_string()),
1713 }
1714 }
1715 _ = cancellation_token.cancelled() => {
1716 return Err("cancelled while waiting for registration response".to_string());
1717 }
1718 }
1719 }
1720 }
1721 ).await;
1722
1723 match registration_result {
1724 Ok(Ok(resp)) => {
1725 if !resp.connections.is_empty() || !resp.routes.is_empty() {
1727 let init_cmd = ControlMessage {
1728 message_id: uuid::Uuid::new_v4().to_string(),
1729 payload: Some(Payload::ConfigCommand(v1::ConfigurationCommand {
1730 connections_to_create: resp.connections,
1731 routes_to_set: resp.routes,
1732 routes_to_delete: vec![],
1733 connections_to_delete: vec![],
1734 reconcile: true,
1735 })),
1736 };
1737 if let Err(e) = this.handle_new_control_message(init_cmd, &tx).await {
1738 error!(error = %e.chain(), "failed to apply initial state from registration");
1739 }
1740 }
1741 }
1742 Ok(Err(e)) => {
1743 error!(%e, "registration handshake failed");
1744 return;
1745 }
1746 Err(_) => {
1747 error!("registration handshake timed out");
1748 return;
1749 }
1750 }
1751 }
1752
1753 let mut drain_fut = std::pin::pin!(watch.clone().signaled());
1754
1755 loop {
1756 tokio::select! {
1757 next = stream.next() => {
1758 match next {
1759 Some(Ok(msg)) => {
1760 if let Err(e) = this.handle_new_control_message(msg, &tx).await {
1761 error!(error = %e.chain(), "error processing incoming control message");
1762 }
1763 }
1764 Some(Err(e)) => {
1765 if let Some(io_err) = Self::match_for_io_error(&e) {
1766 if io_err.kind() == std::io::ErrorKind::BrokenPipe {
1767 info!("connection closed by peer");
1768 } else {
1769 error!(
1771 error = %e.chain(),
1772 io_error_kind = ?io_err.kind(),
1773 "IO error receiving control messages"
1774 );
1775 }
1776 } else {
1777 error!(error = %e.chain(), "error receiving control messages");
1779 }
1780
1781 retry_connect = true;
1782 break;
1783 }
1784 None => {
1785 debug!("end of stream");
1786 retry_connect = true;
1787 break;
1788 }
1789 }
1790 }
1791 _ = cancellation_token.cancelled() => {
1792 debug!("shutting down stream on cancellation token");
1793 break;
1794 }
1795 _ = &mut drain_fut => {
1796 debug!("shutting down stream on drain");
1797 break;
1798 }
1799 }
1800 }
1801
1802 info!(%endpoint, "control plane stream closed");
1803
1804 if retry_connect && let Some(config) = config {
1805 info!(%config.endpoint, "retrying connection to control plane");
1806 this.connect(config.clone(), cancellation_token)
1807 .await
1808 .map_or_else(
1809 |e| {
1810 error!(error = %e.chain(), "failed to reconnect to control plane");
1811 },
1812 |tx| {
1813 info!(%config.endpoint, "reconnected to control plane");
1814
1815 this.inner
1816 .tx_channels
1817 .write()
1818 .insert(config.endpoint.clone(), tx);
1819 },
1820 )
1821 }
1822 });
1823
1824 Ok(handle)
1825 }
1826
1827 async fn connect(
1831 &self,
1832 config: ClientConfig,
1833 cancellation_token: CancellationToken,
1834 ) -> Result<mpsc::Sender<Result<ControlMessage, Status>>, ControllerError> {
1835 info!(%config.endpoint, "connecting to control plane");
1836
1837 self.inner.route_subscription_ids.lock().clear();
1841 self.inner.link_id_to_conn_id.write().clear();
1842
1843 let channel = match config.to_channel().await? {
1844 TransportChannel::Grpc(c) => c,
1845 TransportChannel::Websocket(_) => {
1846 return Err(ControllerError::ConfigError(
1847 slim_config::errors::ConfigError::GrpcChannelUnsupportedTransport,
1848 ));
1849 }
1850 };
1851
1852 let mut client = ControllerServiceClient::new(channel.clone());
1853 let (tx, rx) = mpsc::channel::<Result<ControlMessage, Status>>(128);
1854 let out_stream = ReceiverStream::new(rx).filter_map(|res| match res {
1855 Ok(msg) => Some(msg),
1856 Err(e) => {
1857 error!(error = %e, "dropping outbound control message due to error");
1858 None
1859 }
1860 });
1861 let stream = client
1862 .open_control_channel(Request::new(out_stream))
1863 .await?;
1864
1865 let endpoint_key = config.endpoint.clone();
1867 let handle = self.process_control_message_stream(
1868 Some(config),
1869 stream.into_inner(),
1870 tx.clone(),
1871 cancellation_token.clone(),
1872 )?;
1873 self.inner
1874 .stream_handles
1875 .lock()
1876 .insert(endpoint_key, handle);
1877
1878 Ok(tx)
1880 }
1881
1882 fn match_for_io_error(err_status: &Status) -> Option<&std::io::Error> {
1883 let mut err: &(dyn std::error::Error + 'static) = err_status;
1884
1885 loop {
1886 if let Some(io_err) = err.downcast_ref::<std::io::Error>() {
1887 return Some(io_err);
1888 }
1889
1890 if let Some(h2_err) = err.downcast_ref::<h2::Error>()
1893 && let Some(io_err) = h2_err.get_io()
1894 {
1895 return Some(io_err);
1896 }
1897
1898 err = err.source()?;
1899 }
1900 }
1901}
1902
1903#[tonic::async_trait]
1904impl GrpcControllerService for ControllerService {
1905 type OpenControlChannelStream =
1906 Pin<Box<dyn Stream<Item = Result<ControlMessage, Status>> + Send + 'static>>;
1907
1908 async fn open_control_channel(
1909 &self,
1910 request: Request<tonic::Streaming<ControlMessage>>,
1911 ) -> Result<Response<Self::OpenControlChannelStream>, Status> {
1912 let remote_endpoint = request
1914 .remote_addr()
1915 .map(|addr| addr.to_string())
1916 .unwrap_or_else(|| "unknown".to_string());
1917
1918 let stream = request.into_inner();
1919 let (tx, rx) = mpsc::channel::<Result<ControlMessage, Status>>(128);
1920
1921 let cancellation_token = CancellationToken::new();
1922
1923 let handle = self
1925 .process_control_message_stream(None, stream, tx.clone(), cancellation_token.clone())
1926 .map_err(|e| {
1927 error!(error = %e.chain(), "error processing control message stream");
1928 Status::unavailable("failed to process control message stream")
1929 })?;
1930 self.inner
1931 .stream_handles
1932 .lock()
1933 .insert(remote_endpoint.clone(), handle);
1934
1935 self.inner
1936 .tx_channels
1937 .write()
1938 .insert(remote_endpoint.clone(), tx);
1939
1940 if let Some(old_token) = self
1941 .inner
1942 .cancellation_tokens
1943 .write()
1944 .insert(remote_endpoint.clone(), cancellation_token)
1945 {
1946 old_token.cancel();
1947 }
1948
1949 let out_stream = ReceiverStream::new(rx);
1950 Ok(Response::new(
1951 Box::pin(out_stream) as Self::OpenControlChannelStream
1952 ))
1953 }
1954}
1955
1956#[cfg(test)]
1957mod tests {
1958 use super::*;
1959 use slim_config::component::id::Kind;
1960 use tracing_test::traced_test;
1961
1962 async fn setup_control_planes(
1963 server_endpoint: &str,
1964 server_name: &str,
1965 client_name: &str,
1966 ) -> (ControlPlane, ControlPlane, ClientConfig) {
1967 let id_server = ID::new_with_name(Kind::new("slim").unwrap(), server_name).unwrap();
1968 let id_client = ID::new_with_name(Kind::new("slim").unwrap(), client_name).unwrap();
1969
1970 let server_config = ServerConfig::with_endpoint(server_endpoint)
1971 .with_tls_settings(slim_config::tls::server::TlsServerConfig::insecure());
1972 let client_config = ClientConfig::with_endpoint(&format!("http://{}", server_endpoint))
1973 .with_tls_setting(slim_config::tls::client::TlsClientConfig::insecure());
1974
1975 let message_processor_server = MessageProcessor::new();
1976 let message_processor_client = MessageProcessor::new();
1977
1978 let control_plane_server = ControlPlane::new(ControlPlaneSettings {
1979 id: id_server,
1980 group_name: None,
1981 servers: vec![server_config.clone()],
1982 clients: vec![],
1983 message_processor: Arc::new(message_processor_server),
1984 connection_details: vec![from_server_config(&server_config)],
1985 });
1986
1987 let control_plane_client = ControlPlane::new(ControlPlaneSettings {
1988 id: id_client,
1989 group_name: None,
1990 servers: vec![],
1991 clients: vec![client_config.clone()],
1992 message_processor: Arc::new(message_processor_client),
1993 connection_details: vec![],
1994 });
1995
1996 (control_plane_server, control_plane_client, client_config)
1997 }
1998
1999 #[tokio::test]
2000 #[traced_test]
2001 async fn test_end_to_end() {
2002 let (mut control_plane_server, mut control_plane_client, _client_cfg) =
2003 setup_control_planes(
2004 "127.0.0.1:50051",
2005 "test-server-instance",
2006 "test-client-instance",
2007 )
2008 .await;
2009
2010 control_plane_server.run().await.unwrap();
2011 tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
2012 control_plane_client.run().await.unwrap();
2013 tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
2014
2015 assert!(logs_contain("received a register node request"));
2016 }
2017
2018 #[tokio::test]
2019 #[traced_test]
2020 async fn test_subscription_notification_queue_drain() {
2021 let (mut control_plane_server, mut control_plane_client, client_config) =
2023 setup_control_planes(
2024 "127.0.0.1:50061",
2025 "queue-drain-server",
2026 "queue-drain-client",
2027 )
2028 .await;
2029
2030 let controller = control_plane_client.controller.clone();
2031 assert_eq!(controller.inner.pending_notifications.lock().len(), 0);
2032
2033 const N: usize = 5;
2034 for i in 0..N {
2035 let ctrl_msg = ControlMessage {
2036 message_id: uuid::Uuid::new_v4().to_string(),
2037 payload: Some(Payload::ConfigCommand(v1::ConfigurationCommand {
2038 connections_to_create: vec![],
2039 connections_to_delete: vec![],
2040 routes_to_set: vec![v1::Route {
2041 name: Some(
2042 ProtoName::from_strings(["queued", "sub", &format!("name-{i}")])
2043 .with_id(i as u128),
2044 ),
2045 link_id: None,
2046 direction: None,
2047 }],
2048 routes_to_delete: vec![],
2049 reconcile: false,
2050 })),
2051 };
2052 controller
2053 .send_or_queue_notification(ctrl_msg, std::slice::from_ref(&client_config))
2054 .await;
2055 }
2056 assert_eq!(controller.inner.pending_notifications.lock().len(), N);
2057
2058 control_plane_server.run().await.expect("server run failed");
2059 tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
2060 control_plane_client.run().await.expect("client run failed");
2061 tokio::time::sleep(tokio::time::Duration::from_millis(200)).await;
2062
2063 assert_eq!(controller.inner.pending_notifications.lock().len(), 0);
2064 assert!(
2065 logs_contain(&format!("sending {} queued subscription notifications", N)),
2066 "Expected log about sending queued subscription notifications"
2067 );
2068
2069 drop(controller);
2070 drop(control_plane_server);
2071 drop(control_plane_client);
2072 }
2073
2074 #[tokio::test]
2075 #[traced_test]
2076 async fn test_delete_connection_by_link_id_success_ack() {
2077 let (mut control_plane_server, mut control_plane_client, _client_cfg) =
2078 setup_control_planes(
2079 "127.0.0.1:50081",
2080 "delete-linkid-server",
2081 "delete-linkid-client",
2082 )
2083 .await;
2084
2085 control_plane_server.run().await.unwrap();
2086 tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
2087 control_plane_client.run().await.unwrap();
2088 tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
2089
2090 let controller = control_plane_client.controller.clone();
2091 let link_id = "test-delete-link-id".to_string();
2092 let mut assigned = false;
2093 for _ in 0..50 {
2094 controller
2095 .inner
2096 .message_processor
2097 .connection_table()
2098 .for_each(|_, conn| {
2099 if !assigned {
2100 conn.set_link_id(link_id.clone());
2101 assigned = true;
2102 }
2103 });
2104 if assigned {
2105 break;
2106 }
2107 tokio::time::sleep(tokio::time::Duration::from_millis(20)).await;
2108 }
2109 assert!(
2110 assigned,
2111 "expected at least one connection to assign link_id"
2112 );
2113
2114 let ctrl_msg = ControlMessage {
2115 message_id: uuid::Uuid::new_v4().to_string(),
2116 payload: Some(Payload::ConfigCommand(v1::ConfigurationCommand {
2117 connections_to_create: vec![],
2118 connections_to_delete: vec![link_id.clone()],
2119 routes_to_set: vec![],
2120 routes_to_delete: vec![],
2121 reconcile: false,
2122 })),
2123 };
2124 let (tx, mut rx) = mpsc::channel(1);
2125 controller
2126 .handle_new_control_message(ctrl_msg, &tx)
2127 .await
2128 .expect("config command must be handled");
2129
2130 let ack_msg = rx
2131 .recv()
2132 .await
2133 .expect("expected ack message")
2134 .expect("ack should be ok");
2135 let ack = match ack_msg.payload {
2136 Some(Payload::ConfigCommandAck(ack)) => ack,
2137 _ => panic!("expected ConfigCommandAck payload"),
2138 };
2139 assert_eq!(ack.connections_status.len(), 1);
2140 assert_eq!(ack.connections_status[0].link_id, link_id);
2141 assert!(ack.connections_status[0].success);
2142 }
2143
2144 #[tokio::test]
2145 #[traced_test]
2146 async fn test_delete_connection_by_link_id_unknown_fails_ack() {
2147 let (control_plane_server, control_plane_client, _client_cfg) = setup_control_planes(
2148 "127.0.0.1:50082",
2149 "delete-linkid-server-unknown",
2150 "delete-linkid-client-unknown",
2151 )
2152 .await;
2153
2154 let controller = control_plane_client.controller.clone();
2155 let ctrl_msg = ControlMessage {
2156 message_id: uuid::Uuid::new_v4().to_string(),
2157 payload: Some(Payload::ConfigCommand(v1::ConfigurationCommand {
2158 connections_to_create: vec![],
2159 connections_to_delete: vec!["unknown-link-id".to_string()],
2160 routes_to_set: vec![],
2161 routes_to_delete: vec![],
2162 reconcile: false,
2163 })),
2164 };
2165 let (tx, mut rx) = mpsc::channel(1);
2166 controller
2167 .handle_new_control_message(ctrl_msg, &tx)
2168 .await
2169 .expect("config command must be handled");
2170
2171 let ack_msg = rx
2172 .recv()
2173 .await
2174 .expect("expected ack message")
2175 .expect("ack should be ok");
2176 let ack = match ack_msg.payload {
2177 Some(Payload::ConfigCommandAck(ack)) => ack,
2178 _ => panic!("expected ConfigCommandAck payload"),
2179 };
2180 assert_eq!(ack.connections_status.len(), 1);
2181 assert_eq!(ack.connections_status[0].link_id, "unknown-link-id");
2182 assert!(!ack.connections_status[0].success);
2183 assert!(ack.connections_status[0].error_msg.contains("not found"));
2184
2185 drop(control_plane_server);
2186 }
2187
2188 #[tokio::test]
2189 #[traced_test]
2190 async fn test_create_connection_with_existing_link_id_reuses_connection_ack() {
2191 let (mut control_plane_server, mut control_plane_client, _client_cfg) =
2192 setup_control_planes(
2193 "127.0.0.1:50083",
2194 "create-linkid-server",
2195 "create-linkid-client",
2196 )
2197 .await;
2198
2199 control_plane_server.run().await.unwrap();
2200 tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
2201 control_plane_client.run().await.unwrap();
2202 tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
2203
2204 let controller = control_plane_client.controller.clone();
2205 let link_id = "test-create-link-id".to_string();
2206 let mut assigned = false;
2207 for _ in 0..50 {
2208 controller
2209 .inner
2210 .message_processor
2211 .connection_table()
2212 .for_each(|_, conn| {
2213 if !assigned {
2214 conn.set_link_id(link_id.clone());
2215 assigned = true;
2216 }
2217 });
2218 if assigned {
2219 break;
2220 }
2221 tokio::time::sleep(tokio::time::Duration::from_millis(20)).await;
2222 }
2223 assert!(
2224 assigned,
2225 "expected at least one connection to assign link_id"
2226 );
2227
2228 let endpoint = "http://127.0.0.1:59999";
2229 let connection_config = serde_json::json!({
2230 "endpoint": endpoint,
2231 "link_id": link_id
2232 })
2233 .to_string();
2234
2235 let ctrl_msg = ControlMessage {
2236 message_id: uuid::Uuid::new_v4().to_string(),
2237 payload: Some(Payload::ConfigCommand(v1::ConfigurationCommand {
2238 connections_to_create: vec![v1::Connection {
2239 link_id: "reuse-existing-link".to_string(),
2240 config_data: connection_config,
2241 }],
2242 connections_to_delete: vec![],
2243 routes_to_set: vec![],
2244 routes_to_delete: vec![],
2245 reconcile: false,
2246 })),
2247 };
2248
2249 let (tx, mut rx) = mpsc::channel(1);
2250 controller
2251 .handle_new_control_message(ctrl_msg, &tx)
2252 .await
2253 .expect("config command must be handled");
2254
2255 let ack_msg = rx
2256 .recv()
2257 .await
2258 .expect("expected ack message")
2259 .expect("ack should be ok");
2260 let ack = match ack_msg.payload {
2261 Some(Payload::ConfigCommandAck(ack)) => ack,
2262 _ => panic!("expected ConfigCommandAck payload"),
2263 };
2264 assert_eq!(ack.connections_status.len(), 1);
2265 assert_eq!(ack.connections_status[0].link_id, "reuse-existing-link");
2266 assert!(ack.connections_status[0].success);
2267
2268 assert!(
2269 controller
2270 .inner
2271 .link_id_to_conn_id
2272 .read()
2273 .contains_key(&link_id),
2274 "expected link_id to be mapped to reused connection id"
2275 );
2276 }
2277
2278 #[tokio::test]
2279 #[traced_test]
2280 async fn test_subscription_set_unknown_link_id_fails_ack() {
2281 let (control_plane_server, control_plane_client, _client_cfg) = setup_control_planes(
2282 "127.0.0.1:50084",
2283 "sub-linkid-server-unknown",
2284 "sub-linkid-client-unknown",
2285 )
2286 .await;
2287
2288 let controller = control_plane_client.controller.clone();
2289 let ctrl_msg = ControlMessage {
2290 message_id: uuid::Uuid::new_v4().to_string(),
2291 payload: Some(Payload::ConfigCommand(v1::ConfigurationCommand {
2292 connections_to_create: vec![],
2293 connections_to_delete: vec![],
2294 routes_to_set: vec![v1::Route {
2295 name: Some(ProtoName::from_strings(["org", "ns", "agent"]).with_id(1u128)),
2296 link_id: Some("missing-link-id".to_string()),
2297 direction: None,
2298 }],
2299 routes_to_delete: vec![],
2300 reconcile: false,
2301 })),
2302 };
2303 let (tx, mut rx) = mpsc::channel(1);
2304 controller
2305 .handle_new_control_message(ctrl_msg, &tx)
2306 .await
2307 .expect("config command must be handled");
2308
2309 let ack_msg = rx
2310 .recv()
2311 .await
2312 .expect("expected ack message")
2313 .expect("ack should be ok");
2314 let ack = match ack_msg.payload {
2315 Some(Payload::ConfigCommandAck(ack)) => ack,
2316 _ => panic!("expected ConfigCommandAck payload"),
2317 };
2318
2319 assert_eq!(ack.routes_status.len(), 1);
2320 assert!(!ack.routes_status[0].success);
2321 assert!(
2322 ack.routes_status[0]
2323 .error_msg
2324 .contains("Connection with link_id missing-link-id not found")
2325 );
2326
2327 drop(control_plane_server);
2328 }
2329
2330 #[tokio::test]
2331 #[traced_test]
2332 async fn test_create_connection_invalid_config_fails_ack() {
2333 let (_control_plane_server, control_plane_client, _client_cfg) = setup_control_planes(
2334 "127.0.0.1:50085",
2335 "create-invalid-config-server",
2336 "create-invalid-config-client",
2337 )
2338 .await;
2339
2340 let controller = control_plane_client.controller.clone();
2341 let ctrl_msg = ControlMessage {
2342 message_id: uuid::Uuid::new_v4().to_string(),
2343 payload: Some(Payload::ConfigCommand(v1::ConfigurationCommand {
2344 connections_to_create: vec![v1::Connection {
2345 link_id: "invalid-config-conn".to_string(),
2346 config_data: "{invalid-json".to_string(),
2347 }],
2348 connections_to_delete: vec![],
2349 routes_to_set: vec![],
2350 routes_to_delete: vec![],
2351 reconcile: false,
2352 })),
2353 };
2354 let (tx, mut rx) = mpsc::channel(1);
2355 controller
2356 .handle_new_control_message(ctrl_msg, &tx)
2357 .await
2358 .expect("config command must be handled");
2359
2360 let ack_msg = rx
2361 .recv()
2362 .await
2363 .expect("expected ack message")
2364 .expect("ack should be ok");
2365 let ack = match ack_msg.payload {
2366 Some(Payload::ConfigCommandAck(ack)) => ack,
2367 _ => panic!("expected ConfigCommandAck payload"),
2368 };
2369 assert_eq!(ack.connections_status.len(), 1);
2370 assert!(!ack.connections_status[0].success);
2371 assert!(
2372 ack.connections_status[0]
2373 .error_msg
2374 .contains("Failed to parse config")
2375 );
2376 }
2377
2378 #[tokio::test]
2379 #[traced_test]
2380 async fn test_subscription_delete_unknown_link_id_fails_ack() {
2381 let (control_plane_server, control_plane_client, _client_cfg) = setup_control_planes(
2382 "127.0.0.1:50086",
2383 "sub-del-linkid-server-unknown",
2384 "sub-del-linkid-client-unknown",
2385 )
2386 .await;
2387
2388 let controller = control_plane_client.controller.clone();
2389 let ctrl_msg = ControlMessage {
2390 message_id: uuid::Uuid::new_v4().to_string(),
2391 payload: Some(Payload::ConfigCommand(v1::ConfigurationCommand {
2392 connections_to_create: vec![],
2393 connections_to_delete: vec![],
2394 routes_to_set: vec![],
2395 routes_to_delete: vec![v1::Route {
2396 name: Some(ProtoName::from_strings(["org", "ns", "agent"]).with_id(1u128)),
2397 link_id: Some("missing-link-id-delete".to_string()),
2398 direction: None,
2399 }],
2400 reconcile: false,
2401 })),
2402 };
2403 let (tx, mut rx) = mpsc::channel(1);
2404 controller
2405 .handle_new_control_message(ctrl_msg, &tx)
2406 .await
2407 .expect("config command must be handled");
2408
2409 let ack_msg = rx
2410 .recv()
2411 .await
2412 .expect("expected ack message")
2413 .expect("ack should be ok");
2414 let ack = match ack_msg.payload {
2415 Some(Payload::ConfigCommandAck(ack)) => ack,
2416 _ => panic!("expected ConfigCommandAck payload"),
2417 };
2418
2419 assert_eq!(ack.routes_status.len(), 1);
2420 assert!(!ack.routes_status[0].success);
2421 assert!(
2422 ack.routes_status[0]
2423 .error_msg
2424 .contains("Connection with link_id missing-link-id-delete not found")
2425 );
2426
2427 drop(control_plane_server);
2428 }
2429
2430 #[tokio::test]
2431 #[traced_test]
2432 async fn test_shutdown_drains_resources() {
2433 let (mut control_plane_server, mut control_plane_client, _client_cfg) =
2435 setup_control_planes(
2436 "127.0.0.1:50071",
2437 "shutdown-server-instance",
2438 "shutdown-client-instance",
2439 )
2440 .await;
2441
2442 control_plane_server
2444 .run()
2445 .await
2446 .expect("server should start");
2447 tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
2448 control_plane_client
2449 .run()
2450 .await
2451 .expect("client should start");
2452 tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
2453
2454 let server_tokens_before = control_plane_server
2456 .controller
2457 .inner
2458 .cancellation_tokens
2459 .read()
2460 .len();
2461 assert!(
2462 server_tokens_before > 0,
2463 "expected server to have active cancellation tokens before shutdown"
2464 );
2465
2466 let client_tokens_before = control_plane_client
2467 .controller
2468 .inner
2469 .cancellation_tokens
2470 .read()
2471 .len();
2472 assert!(
2473 client_tokens_before > 0,
2474 "expected client to have active cancellation tokens before shutdown"
2475 );
2476
2477 control_plane_client
2479 .shutdown()
2480 .await
2481 .expect("client shutdown ok");
2482 control_plane_server
2483 .shutdown()
2484 .await
2485 .expect("server shutdown ok");
2486
2487 let server_tokens_after = control_plane_server
2489 .controller
2490 .inner
2491 .cancellation_tokens
2492 .read()
2493 .len();
2494 assert_eq!(
2495 server_tokens_after, 0,
2496 "expected server cancellation tokens to be drained after shutdown"
2497 );
2498
2499 let client_tokens_after = control_plane_client
2500 .controller
2501 .inner
2502 .cancellation_tokens
2503 .read()
2504 .len();
2505 assert_eq!(
2506 client_tokens_after, 0,
2507 "expected client cancellation tokens to be drained after shutdown"
2508 );
2509
2510 assert!(
2512 control_plane_server.shutdown().await.is_err(),
2513 "second shutdown on server should return an error"
2514 );
2515 assert!(
2516 control_plane_client.shutdown().await.is_err(),
2517 "second shutdown on client should return an error"
2518 );
2519 }
2520
2521 #[tokio::test]
2522 #[traced_test]
2523 async fn test_shutdown_without_run() {
2524 let (control_plane_server, mut _control_plane_client, _client_cfg) = setup_control_planes(
2526 "127.0.0.1:50072",
2527 "shutdown-no-run-server",
2528 "shutdown-no-run-client",
2529 )
2530 .await;
2531
2532 assert_eq!(
2534 control_plane_server
2535 .controller
2536 .inner
2537 .cancellation_tokens
2538 .read()
2539 .len(),
2540 0,
2541 "expected zero cancellation tokens before shutdown when not run"
2542 );
2543
2544 control_plane_server
2546 .shutdown()
2547 .await
2548 .expect("shutdown without prior run should succeed");
2549
2550 assert_eq!(
2552 control_plane_server
2553 .controller
2554 .inner
2555 .cancellation_tokens
2556 .read()
2557 .len(),
2558 0,
2559 "expected zero cancellation tokens after shutdown when not run"
2560 );
2561
2562 assert!(
2564 control_plane_server.shutdown().await.is_err(),
2565 "second shutdown should error due to missing drain signal"
2566 );
2567 }
2568}