1use std::collections::HashMap;
5use std::pin::Pin;
6use std::sync::Arc;
7
8use std::time::Duration;
9use std::vec;
10
11use display_error_chain::ErrorChainExt;
12use slim_config::component::id::ID;
13use slim_config::grpc::server::ServerConfig;
14use slim_session::SessionMessage;
15use slim_session::subscription_manager::SubscriptionManager;
16use tokio::sync::mpsc;
17use tokio::task::JoinHandle;
18use tokio_stream::{Stream, StreamExt, wrappers::ReceiverStream};
19use tokio_util::sync::CancellationToken;
20use tonic::{Request, Response, Status};
21use tracing::{debug, error, info};
22
23use crate::api::proto::api::v1::control_message::Payload;
24use crate::api::proto::api::v1::controller_service_server::ControllerServiceServer;
25use crate::api::proto::api::v1::{
26 self, ConnectionDetails, ConnectionDirection, ConnectionListResponse, ConnectionType,
27 SubscriptionListResponse,
28};
29use crate::api::proto::api::v1::{
30 Ack, ConnectionEntry, ControlMessage, SubscriptionEntry,
31 controller_service_client::ControllerServiceClient,
32 controller_service_server::ControllerService as GrpcControllerService,
33};
34use crate::errors::ControllerError;
35use prost_types::Struct;
36use slim_auth::auth_provider::{AuthProvider, AuthVerifier};
37use slim_auth::traits::TokenProvider;
38use slim_config::grpc::client::ClientConfig;
39use slim_datapath::api::{
40 CommandPayload, Content, MessageType::Link as LinkType, MessageType::Publish,
41 MessageType::Subscribe, MessageType::SubscriptionAck as SubscriptionAckType,
42 MessageType::Unsubscribe, ProtoMessage as DataPlaneMessage,
43};
44use slim_datapath::api::{ProtoSessionMessageType, ProtoSessionType};
45use slim_datapath::message_processing::MessageProcessor;
46use slim_datapath::messages::Name;
47use slim_datapath::messages::encoder::calculate_hash;
48use slim_datapath::messages::utils::{DELETE_GROUP, IS_MODERATOR, SlimHeaderFlags, TRUE_VAL};
49use slim_datapath::tables::SubscriptionTable;
50
51use slim_session::timer::{Timer, TimerType};
52use slim_session::timer_factory::{TimerFactory, TimerSettings};
53
54type TxChannel = mpsc::Sender<Result<ControlMessage, Status>>;
55type TxChannels = HashMap<String, TxChannel>;
56
57const CONTROLLER_COMPONENT: &str = "controller";
59const MAX_QUEUED_NOTIFICATIONS: usize = 1000; #[derive(Clone)]
64pub struct ControlPlaneSettings {
65 pub id: ID,
67 pub group_name: Option<String>,
69 pub servers: Vec<ServerConfig>,
71 pub clients: Vec<ClientConfig>,
73 pub message_processor: Arc<MessageProcessor>,
75 pub auth_provider: Option<AuthProvider>,
77 pub auth_verifier: Option<AuthVerifier>,
79 pub connection_details: Vec<ConnectionDetails>,
82}
83
84struct ControllerServiceInternal {
89 id: ID,
91
92 controller_name: slim_datapath::messages::Name,
94
95 group_name: Option<String>,
97
98 message_processor: Arc<MessageProcessor>,
100
101 connections: Arc<parking_lot::RwLock<HashMap<String, u64>>>,
103
104 tx_slim: mpsc::Sender<Result<DataPlaneMessage, Status>>,
106
107 tx_channels: parking_lot::RwLock<TxChannels>,
109
110 cancellation_tokens: parking_lot::RwLock<HashMap<String, CancellationToken>>,
112
113 drain_watch: parking_lot::RwLock<Option<drain::Watch>>,
115
116 auth_provider: Option<AuthProvider>,
118
119 _auth_verifier: Option<AuthVerifier>,
121
122 pending_notifications: Arc<parking_lot::Mutex<Vec<ControlMessage>>>,
124
125 subscription_manager: SubscriptionManager,
127
128 message_id_map: Arc<parking_lot::RwLock<HashMap<u32, (String, Option<Timer>)>>>,
130
131 timer_factory: parking_lot::RwLock<Option<TimerFactory>>,
136
137 connection_details: Vec<ConnectionDetails>,
139
140 route_subscription_ids: parking_lot::Mutex<HashMap<(Name, u64), u64>>,
142}
143
144#[derive(Clone)]
145struct ControllerService {
146 inner: Arc<ControllerServiceInternal>,
148}
149
150pub struct ControlPlane {
152 servers: Vec<ServerConfig>,
154
155 clients: Vec<ClientConfig>,
157
158 drain_signal: parking_lot::RwLock<Option<drain::Signal>>,
160
161 controller: ControllerService,
163
164 rx_slim_option: Option<mpsc::Receiver<Result<DataPlaneMessage, Status>>>,
167}
168
169impl Drop for ControlPlane {
172 fn drop(&mut self) {
173 for (_endpoint, token) in self.controller.inner.cancellation_tokens.write().drain() {
175 token.cancel();
176 }
177 }
178}
179
180pub(crate) fn from_server_config(server_config: &ServerConfig) -> ConnectionDetails {
181 let metadata = server_config.metadata.as_ref().map(|m| {
183 let fields = m
184 .inner
185 .iter()
186 .map(|(k, v)| (k.clone(), prost_types::Value::from(v)))
187 .collect();
188 Struct { fields }
189 });
190
191 let auth = serde_json::to_string(&server_config.auth).ok();
193
194 let tls = serde_json::to_string(&server_config.tls_setting.config).ok();
196
197 ConnectionDetails {
198 endpoint: server_config.endpoint.clone(),
199 mtls_required: !server_config.tls_setting.insecure,
200 metadata,
201 auth,
202 tls,
203 }
204}
205
206impl ControlPlane {
208 pub fn new(config: ControlPlaneSettings) -> Self {
221 let (_, tx_slim, rx_slim) = config
223 .message_processor
224 .register_local_connection(true)
225 .unwrap();
226
227 let (signal, watch) = drain::channel();
228 let controller_name = Name::from_strings([
229 CONTROLLER_COMPONENT,
230 CONTROLLER_COMPONENT,
231 CONTROLLER_COMPONENT,
232 ])
233 .with_id(rand::random::<u64>());
234 debug!("create controller with name: {}", controller_name);
235
236 ControlPlane {
237 servers: config.servers,
238 clients: config.clients,
239 controller: ControllerService {
240 inner: Arc::new(ControllerServiceInternal {
241 id: config.id,
242 controller_name,
243 group_name: config.group_name,
244 message_processor: config.message_processor,
245 connections: Arc::new(parking_lot::RwLock::new(HashMap::new())),
246 subscription_manager: SubscriptionManager::new(tx_slim.clone()),
247 tx_slim,
248 tx_channels: parking_lot::RwLock::new(HashMap::new()),
249 cancellation_tokens: parking_lot::RwLock::new(HashMap::new()),
250 drain_watch: parking_lot::RwLock::new(Some(watch)),
251 auth_provider: config.auth_provider,
252 _auth_verifier: config.auth_verifier,
253 pending_notifications: Arc::new(parking_lot::Mutex::new(Vec::new())),
254 message_id_map: Arc::new(parking_lot::RwLock::new(HashMap::new())),
255 timer_factory: parking_lot::RwLock::new(None),
256 connection_details: config.connection_details,
257 route_subscription_ids: parking_lot::Mutex::new(HashMap::new()),
258 }),
259 },
260 drain_signal: parking_lot::RwLock::new(Some(signal)),
261 rx_slim_option: Some(rx_slim),
262 }
263 }
264
265 pub fn with_clients(mut self, clients: Vec<ClientConfig>) -> Self {
267 self.clients = clients;
268 self
269 }
270
271 pub fn with_servers(mut self, servers: Vec<ServerConfig>) -> Self {
273 self.servers = servers;
274 self
275 }
276
277 pub async fn run(&mut self) -> Result<(), ControllerError> {
284 let rx = self
285 .rx_slim_option
286 .take()
287 .ok_or(ControllerError::AlreadyStarted)?;
288
289 let servers = self.servers.clone();
291 let clients = self.clients.clone();
292
293 for server in servers {
295 self.run_server(server).await?;
296 }
297
298 for client in clients {
300 self.run_client(client).await?;
301 }
302
303 self.listen_from_data_plane(rx).await?;
304
305 Ok(())
306 }
307
308 pub async fn shutdown(&self) -> Result<(), ControllerError> {
309 let signal = self
311 .drain_signal
312 .write()
313 .take()
314 .ok_or(ControllerError::AlreadyStopped)?;
315
316 self.controller
318 .inner
319 .cancellation_tokens
320 .write()
321 .drain()
322 .for_each(|(endpoint, token)| {
323 info!(%endpoint, "stopping");
324 token.cancel();
325 });
326
327 self.controller.inner.drain_watch.write().take();
329
330 signal.drain().await;
332
333 Ok(())
334 }
335
336 async fn listen_from_data_plane(
337 &mut self,
338 mut rx: mpsc::Receiver<Result<DataPlaneMessage, Status>>,
339 ) -> Result<(), ControllerError> {
340 let cancellation_token = CancellationToken::new();
341 let cancellation_token_clone = cancellation_token.clone();
342
343 self.controller
344 .inner
345 .cancellation_tokens
346 .write()
347 .insert("DATA_PLANE".to_string(), cancellation_token_clone);
348
349 let clients = self.clients.clone();
350 let controller = self.controller.clone();
351
352 let controller_name = self.controller.inner.controller_name.clone();
354 let subscribe_msg = DataPlaneMessage::builder()
355 .source(controller_name.clone())
356 .destination(controller_name.clone())
357 .identity(controller_name.to_string())
358 .build_subscribe()
359 .unwrap();
360
361 controller
362 .inner
363 .tx_slim
364 .send(Ok(subscribe_msg))
365 .await
366 .map_err(|e| {
367 error!(error = %e.chain(), "failed to send subscribe message to data plane");
368 ControllerError::DatapathSendError(e.to_string())
369 })?;
370
371 let watch = self.controller.drain_watch()?;
373
374 debug!("Starting data plane listener: {}", controller_name);
375 tokio::spawn(async move {
376 let mut drain_fut = std::pin::pin!(watch.signaled());
377 loop {
378 tokio::select! {
379 next = rx.recv() => {
380 match next {
381 Some(res) => {
382 match res {
383 Ok(msg) => {
384 debug!("Send sub/unsub/ack to control plane for message: {:?}", msg);
385 match msg.get_type() {
386 Subscribe(_) => {
387 controller.handle_subscribe_message(msg.get_dst(), &clients).await;
388 }
389 Unsubscribe(_) => {
390 controller.handle_unsubscribe_message(msg.get_dst(), &clients).await;
391 }
392 Publish(_) => {
393 if msg.get_session_message_type() == ProtoSessionMessageType::GroupAck {
394 controller.send_ack_message(msg.get_id(), true, &clients).await;
395 } else {
396 debug!("Ignoring publish message with session type: {:?}", msg.get_session_message_type());
397 }
398 }
399 LinkType(_) => {
400 debug!("received link message from dataplane - this should not happen");
401 }
402 SubscriptionAckType(_) => {
403 controller.inner.subscription_manager.resolve_ack(msg.get_subscription_ack());
404 }
405 }
406 }
407 Err(e) => {
408 error!(error = %e.chain(), "received error from the data plane");
409 continue;
410 }
411 }
412 }
413 None => {
414 debug!("Data plane receiver channel closed.");
415 break;
416 }
417 }
418 }
419 _ = cancellation_token.cancelled() => {
420 debug!("shutting down stream on cancellation token");
421 break;
422 }
423 _ = &mut drain_fut => {
424 debug!("shutting down stream on drain");
425 break;
426 }
427 }
428 }
429 });
430 Ok(())
431 }
432
433 pub fn stop(&mut self) {
437 info!("stopping controller service");
438
439 for (endpoint, token) in self.controller.inner.cancellation_tokens.write().drain() {
441 info!(%endpoint, "stopping");
442 token.cancel();
443 }
444 }
445
446 async fn run_client(&mut self, client: ClientConfig) -> Result<(), ControllerError> {
450 if self
451 .controller
452 .inner
453 .cancellation_tokens
454 .read()
455 .contains_key(&client.endpoint)
456 {
457 return Err(ControllerError::ClientAlreadyRunning(client.endpoint));
458 }
459
460 let cancellation_token = CancellationToken::new();
461
462 let tx = self
463 .controller
464 .connect(client.clone(), cancellation_token.clone())
465 .await?;
466
467 self.controller
469 .inner
470 .cancellation_tokens
471 .write()
472 .insert(client.endpoint.clone(), cancellation_token);
473
474 self.controller
476 .inner
477 .tx_channels
478 .write()
479 .insert(client.endpoint.clone(), tx);
480
481 Ok(())
483 }
484
485 pub async fn run_server(&mut self, config: ServerConfig) -> Result<(), ControllerError> {
489 if self
491 .controller
492 .inner
493 .cancellation_tokens
494 .read()
495 .contains_key(&config.endpoint)
496 {
497 error!(endpoint = config.endpoint, "server is already running",);
498 return Err(ControllerError::ServerAlreadyRunning(config.endpoint));
499 }
500
501 let token = config
502 .run_server(
503 &[ControllerServiceServer::new(self.controller.clone())],
504 self.controller.drain_watch()?,
505 )
506 .await?;
507
508 self.controller
510 .inner
511 .cancellation_tokens
512 .write()
513 .insert(config.endpoint.clone(), token.clone());
514
515 info!(%config.endpoint, "started controlplane server");
516
517 Ok(())
518 }
519}
520
521fn generate_session_id(moderator: &Name, channel: &Name) -> u32 {
522 let mut all: [u64; 8] = [0; 8];
525 let m = moderator.components();
526 let c = channel.components();
527 all[..4].copy_from_slice(m);
528 all[4..].copy_from_slice(c);
529
530 let hash = calculate_hash(&all);
531 (hash ^ (hash >> 32)) as u32
532}
533
534fn get_name_from_string(string_name: &str) -> Result<Name, ControllerError> {
535 let parts: Vec<&str> = string_name.split('/').collect();
536 if parts.len() < 3 {
537 return Err(ControllerError::MalformedName(string_name.to_owned()));
538 }
539
540 if parts.len() == 4 {
541 let id = parts[3]
542 .parse::<u64>()
543 .map_err(|_e| ControllerError::MalformedName(string_name.to_owned()))?;
544 return Ok(Name::from_strings([parts[0], parts[1], parts[2]]).with_id(id));
545 }
546
547 Ok(Name::from_strings([parts[0], parts[1], parts[2]]))
548}
549
550#[allow(clippy::too_many_arguments)]
551fn create_channel_message(
552 source: &Name,
553 destination: &Name,
554 request_type: ProtoSessionMessageType,
555 session_id: u32,
556 message_id: u32,
557 payload: Option<Content>,
558 auth_provider: &Option<AuthProvider>,
559) -> Result<DataPlaneMessage, ControllerError> {
560 let identity_token = match auth_provider {
562 Some(auth) => auth.get_token()?,
563 None => String::new(),
564 };
565
566 let message = DataPlaneMessage::builder()
567 .source(source.clone())
568 .destination(destination.clone())
569 .identity(&identity_token)
570 .session_type(ProtoSessionType::Multicast)
571 .session_message_type(request_type)
572 .session_id(session_id)
573 .message_id(message_id)
574 .payload(payload.ok_or(ControllerError::PayloadMissing)?)
575 .build_publish()?;
576
577 Ok(message)
578}
579
580fn new_channel_message(
581 controller: &Name,
582 moderator: &Name,
583 channel: &Name,
584 message_id: u32,
585 auth_provider: &Option<AuthProvider>,
586) -> Result<DataPlaneMessage, ControllerError> {
587 let session_id = generate_session_id(moderator, channel);
588
589 let invite_payload = Some(
590 CommandPayload::builder()
591 .join_request(
592 true,
593 Some(10),
594 Some(Duration::from_secs(1)),
595 Some(channel.clone()),
596 )
597 .as_content(),
598 );
599
600 let mut msg = create_channel_message(
601 controller,
602 moderator,
603 ProtoSessionMessageType::JoinRequest,
604 session_id,
605 message_id,
606 invite_payload,
607 auth_provider,
608 )?;
609
610 msg.insert_metadata(IS_MODERATOR.to_string(), TRUE_VAL.to_string());
611 Ok(msg)
612}
613
614fn delete_channel_message(
615 controller: &Name,
616 moderator: &Name,
617 channel_name: &Name,
618 msg_id: u32,
619 auth_provider: &Option<AuthProvider>,
620) -> Result<DataPlaneMessage, ControllerError> {
621 let session_id = generate_session_id(moderator, channel_name);
622
623 let payload = Some(CommandPayload::builder().leave_request(None).as_content());
624
625 let mut msg = create_channel_message(
626 controller,
627 moderator,
628 ProtoSessionMessageType::LeaveRequest,
629 session_id,
630 msg_id,
631 payload,
632 auth_provider,
633 )?;
634
635 msg.insert_metadata(DELETE_GROUP.to_string(), TRUE_VAL.to_string());
636 Ok(msg)
637}
638
639fn invite_participant_message(
640 controller: &Name,
641 moderator: &Name,
642 participant: &Name,
643 channel_name: &Name,
644 msg_id: u32,
645 auth_provider: &Option<AuthProvider>,
646) -> Result<DataPlaneMessage, ControllerError> {
647 let session_id = generate_session_id(moderator, channel_name);
648
649 let payload = Some(
650 CommandPayload::builder()
651 .discovery_request(Some(participant.clone()))
652 .as_content(),
653 );
654
655 let msg = create_channel_message(
656 controller,
657 moderator,
658 ProtoSessionMessageType::DiscoveryRequest,
659 session_id,
660 msg_id,
661 payload,
662 auth_provider,
663 )?;
664
665 Ok(msg)
666}
667
668fn remove_participant_message(
669 controller: &Name,
670 moderator: &Name,
671 participant: &Name,
672 channel_name: &Name,
673 msg_id: u32,
674 auth_provider: &Option<AuthProvider>,
675) -> Result<DataPlaneMessage, ControllerError> {
676 let session_id = generate_session_id(moderator, channel_name);
677
678 let payload = Some(
679 CommandPayload::builder()
680 .leave_request(Some(participant.clone()))
681 .as_content(),
682 );
683
684 let msg = create_channel_message(
685 controller,
686 moderator,
687 ProtoSessionMessageType::LeaveRequest,
688 session_id,
689 msg_id,
690 payload,
691 auth_provider,
692 )?;
693
694 Ok(msg)
695}
696
697impl ControllerService {
698 fn resolve_connection_by_link_id(&self, link_id: &str) -> Result<Option<u64>, String> {
699 let mut resolved: Option<u64> = None;
700
701 self.inner
702 .message_processor
703 .connection_table()
704 .for_each(|id, conn| {
705 if conn.link_id().as_deref() == Some(link_id) && resolved.is_none() {
706 resolved = Some(id);
707 }
708 });
709
710 Ok(resolved)
711 }
712
713 fn disconnect_connection_by_link_id(&self, link_id: &str) -> Result<(), String> {
714 if link_id.trim().is_empty() {
715 return Err("link_id cannot be empty".to_string());
716 }
717
718 let conn_id = match self.resolve_connection_by_link_id(link_id)? {
719 Some(id) => id,
720 None => {
721 return Err(format!("Connection with link_id {} not found", link_id));
722 }
723 };
724
725 if let Err(e) = self.inner.message_processor.disconnect(conn_id) {
726 info!(
728 link_id = %link_id,
729 conn_id,
730 error = %e,
731 "Disconnect returned an error; continuing delete flow"
732 );
733 }
734
735 self.inner
737 .connections
738 .write()
739 .retain(|_, mapped| *mapped != conn_id);
740
741 info!(link_id = %link_id, conn_id, "Successfully deleted connection by link_id");
742 Ok(())
743 }
744
745 fn resolve_subscription_connection(
746 &self,
747 subscription: &v1::Subscription,
748 ) -> Result<Option<u64>, String> {
749 if let Some(link_id) = &subscription.link_id {
750 let trimmed = link_id.trim();
751 if !trimmed.is_empty() {
752 return self.resolve_connection_by_link_id(trimmed);
753 }
754 }
755
756 Ok(self
757 .inner
758 .connections
759 .read()
760 .get(&subscription.connection_id)
761 .cloned())
762 }
763
764 async fn handle_new_control_message(
766 &self,
767 msg: ControlMessage,
768 tx: &mpsc::Sender<Result<ControlMessage, Status>>,
769 ) -> Result<(), ControllerError> {
770 match msg.payload {
771 Some(ref payload) => {
772 match payload {
773 Payload::ConfigCommand(config) => {
774 let mut connections_status = Vec::new();
775 let mut subscriptions_status = Vec::new();
776
777 for link_id in &config.connections_to_delete {
779 info!(link_id = %link_id, "received a connection to delete");
780 let mut connection_success = true;
781 let mut connection_error_msg = String::new();
782
783 if let Err(err) = self.disconnect_connection_by_link_id(link_id) {
784 connection_success = false;
785 connection_error_msg = err;
786 }
787
788 connections_status.push(v1::ConnectionAck {
789 connection_id: link_id.clone(),
790 success: connection_success,
791 error_msg: connection_error_msg,
792 });
793 }
794
795 for conn in &config.connections_to_create {
797 info!(?conn, "received a connection to create");
798 let mut connection_success = true;
799 let mut connection_error_msg = String::new();
800
801 match serde_json::from_str::<ClientConfig>(&conn.config_data) {
802 Err(e) => {
803 connection_success = false;
804 connection_error_msg = format!("Failed to parse config: {}", e);
805 }
806 Ok(client_config) => {
807 let client_endpoint = &client_config.endpoint;
808 let requested_link_id =
809 if client_config.link_id.trim().is_empty() {
810 String::new()
811 } else {
812 client_config.link_id.clone()
813 };
814 let mut existing_conn_for_link_id = false;
815
816 if !requested_link_id.is_empty() {
817 match self.resolve_connection_by_link_id(&requested_link_id)
818 {
819 Err(err) => {
820 connection_success = false;
821 connection_error_msg = err;
822 }
823 Ok(Some(conn_id)) => {
824 existing_conn_for_link_id = true;
825 self.inner
826 .connections
827 .write()
828 .insert(client_endpoint.clone(), conn_id);
829 info!(
830 endpoint = %client_endpoint,
831 link_id = %requested_link_id,
832 conn_id,
833 "Connection already exists for link_id"
834 );
835 }
836 Ok(None) => {}
837 }
838 }
839
840 if connection_success && !existing_conn_for_link_id {
841 if !self
843 .inner
844 .connections
845 .read()
846 .contains_key(client_endpoint)
847 {
848 match self
849 .inner
850 .message_processor
851 .connect(client_config.clone(), None, None)
852 .await
853 {
854 Err(e) => {
855 connection_success = false;
856 connection_error_msg =
857 format!("Connection failed: {}", e);
858 }
859 Ok(conn_id) => {
860 self.inner
861 .connections
862 .write()
863 .insert(client_endpoint.clone(), conn_id.1);
864 info!(
865 endpoint = %client_endpoint, "Successfully created connection",
866
867 );
868 }
869 }
870 } else {
871 info!(endpoint = %client_endpoint, "Connection already exists");
872 }
873 }
874 }
875 }
876
877 connections_status.push(v1::ConnectionAck {
879 connection_id: conn.connection_id.clone(),
880 success: connection_success,
881 error_msg: connection_error_msg,
882 });
883 }
884
885 let identity_token = match &self.inner.auth_provider {
887 Some(auth) => auth.get_token()?,
888 None => String::new(),
889 };
890
891 for subscription in &config.subscriptions_to_set {
893 let mut subscription_success = true;
894 let mut subscription_error_msg = String::new();
895
896 let conn = self.resolve_subscription_connection(subscription);
897
898 if let Ok(Some(conn)) = conn {
899 let source = Name::from_strings([
900 subscription.component_0.as_str(),
901 subscription.component_1.as_str(),
902 subscription.component_2.as_str(),
903 ])
904 .with_id(0);
905 let name = Name::from_strings([
906 subscription.component_0.as_str(),
907 subscription.component_1.as_str(),
908 subscription.component_2.as_str(),
909 ])
910 .with_id(subscription.id.unwrap_or(Name::NULL_COMPONENT));
911
912 let msg = DataPlaneMessage::builder()
913 .source(source.clone())
914 .destination(name.clone())
915 .identity(&identity_token)
916 .flags(SlimHeaderFlags::default().with_recv_from(conn))
917 .build_subscribe()
918 .unwrap();
919
920 match self.send_subscribe_message_with_ack(msg).await {
921 Ok(subscription_id) => {
922 self.inner
924 .route_subscription_ids
925 .lock()
926 .insert((name.clone(), conn), subscription_id);
927 info!(?subscription, "Successfully created subscription");
928 }
929 Err(err) => {
930 subscription_success = false;
931 subscription_error_msg =
932 format!("Failed to subscribe: {}", err);
933 }
934 }
935 } else {
936 subscription_success = false;
937 subscription_error_msg = match conn {
938 Ok(None) => {
939 if let Some(link_id) = &subscription.link_id {
940 format!("Connection with link_id {} not found", link_id)
941 } else {
942 format!(
943 "Connection {} not found",
944 subscription.connection_id
945 )
946 }
947 }
948 Err(err) => err,
949 _ => "unknown connection lookup error".to_string(),
950 };
951 }
952
953 subscriptions_status.push(v1::SubscriptionAck {
955 subscription: Some(subscription.clone()),
956 success: subscription_success,
957 error_msg: subscription_error_msg,
958 });
959 }
960
961 for subscription in &config.subscriptions_to_delete {
963 let mut subscription_success = true;
964 let mut subscription_error_msg = String::new();
965
966 let conn = self.resolve_subscription_connection(subscription);
967
968 if let Ok(Some(conn)) = conn {
969 let source = Name::from_strings([
970 subscription.component_0.as_str(),
971 subscription.component_1.as_str(),
972 subscription.component_2.as_str(),
973 ])
974 .with_id(0);
975 let name = Name::from_strings([
976 subscription.component_0.as_str(),
977 subscription.component_1.as_str(),
978 subscription.component_2.as_str(),
979 ])
980 .with_id(subscription.id.unwrap_or(Name::NULL_COMPONENT));
981
982 let msg = DataPlaneMessage::builder()
983 .source(source.clone())
984 .destination(name.clone())
985 .identity(&identity_token)
986 .flags(SlimHeaderFlags::default().with_recv_from(conn))
987 .build_unsubscribe()
988 .unwrap();
989
990 let sub_id = self
991 .inner
992 .route_subscription_ids
993 .lock()
994 .remove(&(name.clone(), conn));
995 match sub_id {
996 Some(subscription_id) => {
997 if let Err(err) = self
998 .send_unsubscribe_message_with_ack(msg, subscription_id)
999 .await
1000 {
1001 subscription_success = false;
1002 subscription_error_msg =
1003 format!("Failed to unsubscribe: {}", err);
1004 } else {
1005 info!(
1006 ?subscription,
1007 "Successfully deleted subscription"
1008 );
1009 }
1010 }
1011 None => {
1012 subscription_success = false;
1013 subscription_error_msg = format!(
1014 "No subscription_id found for ({}, {})",
1015 name, conn
1016 );
1017 }
1018 }
1019 } else {
1020 subscription_success = false;
1021 subscription_error_msg = match conn {
1022 Ok(None) => {
1023 if let Some(link_id) = &subscription.link_id {
1024 format!("Connection with link_id {} not found", link_id)
1025 } else {
1026 format!(
1027 "Connection {} not found",
1028 subscription.connection_id
1029 )
1030 }
1031 }
1032 Err(err) => err,
1033 _ => "unknown connection lookup error".to_string(),
1034 };
1035 }
1036
1037 subscriptions_status.push(v1::SubscriptionAck {
1039 subscription: Some(subscription.clone()),
1040 success: subscription_success,
1041 error_msg: subscription_error_msg,
1042 });
1043 }
1044
1045 let config_ack = v1::ConfigurationCommandAck {
1047 original_message_id: msg.message_id.clone(),
1048 connections_status,
1049 subscriptions_status,
1050 };
1051
1052 let reply = ControlMessage {
1053 message_id: uuid::Uuid::new_v4().to_string(),
1054 payload: Some(Payload::ConfigCommandAck(config_ack)),
1055 };
1056
1057 if let Err(e) = tx.send(Ok(reply)).await {
1058 error!(error = %e.chain(), "failed to send ConfigurationCommandAck");
1059 }
1060
1061 info!(
1062 connections = %config.connections_to_create.len(),
1063 connections_to_delete = %config.connections_to_delete.len(),
1064 subscriptions_to_set = %config.subscriptions_to_set.len(),
1065 subscriptions_to_del = %config.subscriptions_to_delete.len(),
1066 "Processed ConfigurationCommand"
1067 );
1068 }
1069 Payload::SubscriptionListRequest(_) => {
1070 const CHUNK_SIZE: usize = 100;
1071
1072 let conn_table = self.inner.message_processor.connection_table();
1073 let mut entries = Vec::new();
1074
1075 self.inner.message_processor.subscription_table().for_each(
1076 |name, id, local, remote| {
1077 let mut entry = SubscriptionEntry {
1078 component_0: name.components_strings()[0].to_string(),
1079 component_1: name.components_strings()[1].to_string(),
1080 component_2: name.components_strings()[2].to_string(),
1081 id: Some(id),
1082 ..Default::default()
1083 };
1084
1085 for &cid in local {
1086 entry.local_connections.push(ConnectionEntry {
1087 id: cid,
1088 connection_type: ConnectionType::Local as i32,
1089 config_data: "{}".to_string(),
1090 link_id: None,
1091 direction: ConnectionDirection::Outgoing as i32,
1092 });
1093 }
1094
1095 for &cid in remote {
1096 if let Some(conn) = conn_table.get(cid) {
1097 entry.remote_connections.push(ConnectionEntry {
1098 id: cid,
1099 connection_type: ConnectionType::Remote as i32,
1100 config_data: match conn.config_data() {
1101 Some(data) => serde_json::to_string(data)
1102 .unwrap_or_else(|_| "{}".to_string()),
1103 None => "{}".to_string(),
1104 },
1105 link_id: conn.link_id(),
1106 direction: if conn.is_outgoing() {
1107 ConnectionDirection::Outgoing as i32
1108 } else {
1109 ConnectionDirection::Incoming as i32
1110 },
1111 });
1112 } else {
1113 error!(%cid, "no connection entry for id");
1114 }
1115 }
1116 entries.push(entry);
1117 },
1118 );
1119
1120 for chunk in entries.chunks(CHUNK_SIZE) {
1121 let resp = ControlMessage {
1122 message_id: uuid::Uuid::new_v4().to_string(),
1123 payload: Some(Payload::SubscriptionListResponse(
1124 SubscriptionListResponse {
1125 original_message_id: msg.message_id.clone(),
1126 entries: chunk.to_vec(),
1127 },
1128 )),
1129 };
1130
1131 if let Err(e) = tx.try_send(Ok(resp)) {
1132 error!(error = %e.chain(), "failed to send subscription batch");
1133 }
1134 }
1135 }
1136 Payload::ConnectionListRequest(_) => {
1137 let mut all_entries = Vec::new();
1138 self.inner
1139 .message_processor
1140 .connection_table()
1141 .for_each(|id, conn| {
1142 info!(
1143 conn_id = id,
1144 local_addr = ?conn.local_addr(),
1145 remote_addr = ?conn.remote_addr(),
1146 is_outgoing = conn.is_outgoing(),
1147 link_id = ?conn.link_id(),
1148 "connection entry",
1149 );
1150 all_entries.push(ConnectionEntry {
1151 id,
1152 connection_type: ConnectionType::Remote as i32,
1153 config_data: match conn.config_data() {
1154 Some(data) => serde_json::to_string(data)
1155 .unwrap_or_else(|_| "{}".to_string()),
1156 None => "{}".to_string(),
1157 },
1158 link_id: conn.link_id(),
1159 direction: if conn.is_outgoing() {
1160 ConnectionDirection::Outgoing as i32
1161 } else {
1162 ConnectionDirection::Incoming as i32
1163 },
1164 });
1165 });
1166
1167 const CHUNK_SIZE: usize = 100;
1168 for chunk in all_entries.chunks(CHUNK_SIZE) {
1169 let resp = ControlMessage {
1170 message_id: uuid::Uuid::new_v4().to_string(),
1171 payload: Some(Payload::ConnectionListResponse(
1172 ConnectionListResponse {
1173 original_message_id: msg.message_id.clone(),
1174 entries: chunk.to_vec(),
1175 },
1176 )),
1177 };
1178
1179 if let Err(e) = tx.try_send(Ok(resp)) {
1180 error!(error = %e.chain(), "failed to send connection list batch");
1181 }
1182 }
1183 }
1184 Payload::Ack(_ack) => {
1185 }
1187 Payload::ConfigCommandAck(_) => {
1188 }
1190 Payload::SubscriptionListResponse(_) => {
1191 }
1193 Payload::ConnectionListResponse(_) => {
1194 }
1196 Payload::RegisterNodeRequest(_) => {
1197 error!("received a register node request");
1198 }
1199 Payload::RegisterNodeResponse(_) => {
1200 }
1202 Payload::DeregisterNodeRequest(_) => {
1203 error!("received a deregister node request");
1204 }
1205 Payload::DeregisterNodeResponse(_) => {
1206 }
1208 Payload::CreateChannelRequest(req) => {
1209 info!("received a channel create request");
1210
1211 let mut success = true;
1212 if let Some(first_moderator) = req.moderators.first() {
1214 let moderator_name = get_name_from_string(first_moderator)?;
1215 if !moderator_name.has_id() {
1216 error!("missing moderator ID");
1217 success = false;
1218 } else {
1219 let channel_name = get_name_from_string(&req.channel_name)?;
1220 let new_msg_id = rand::random::<u32>();
1221 let controller_name = self.inner.controller_name.clone();
1222 let creation_msg = new_channel_message(
1223 &controller_name,
1224 &moderator_name,
1225 &channel_name,
1226 new_msg_id,
1227 &self.inner.auth_provider,
1228 )?;
1229
1230 debug!("send session creation message: {:?}", creation_msg);
1231 if let Err(e) = self.send_control_message(creation_msg).await {
1232 error!(error = %e.chain(), "failed to send channel creation");
1233 success = false;
1234 } else {
1235 debug!(
1237 "create timer for message id: {} with type {:?}",
1238 new_msg_id,
1239 ProtoSessionMessageType::JoinRequest
1240 );
1241 let timer =
1242 self.inner.timer_factory.read().as_ref().map(|factory| {
1243 factory.create_and_start_timer(
1244 new_msg_id,
1245 ProtoSessionMessageType::JoinRequest,
1246 None,
1247 )
1248 });
1249 self.inner
1250 .message_id_map
1251 .write()
1252 .insert(new_msg_id, (msg.message_id.clone(), timer));
1253 }
1254 }
1255 } else {
1256 error!("no moderators specified in create channel request message");
1257 success = false;
1258 };
1259
1260 if !success {
1261 let ack = Ack {
1262 original_message_id: msg.message_id.clone(),
1263 success,
1264 messages: vec![msg.message_id.clone()],
1265 };
1266
1267 let reply = ControlMessage {
1268 message_id: uuid::Uuid::new_v4().to_string(),
1269 payload: Some(Payload::Ack(ack)),
1270 };
1271
1272 if let Err(e) = tx.send(Ok(reply)).await {
1273 error!(error = %e.chain(), "failed to send ack");
1274 }
1275 }
1276 }
1277 Payload::DeleteChannelRequest(req) => {
1278 info!("received a channel delete request");
1279 let mut success = true;
1280
1281 if let Some(first_moderator) = req.moderators.first() {
1283 let moderator_name = get_name_from_string(first_moderator)?;
1284 if !moderator_name.has_id() {
1285 error!("missing moderator ID");
1286 success = false;
1287 } else {
1288 let channel_name = get_name_from_string(&req.channel_name)?;
1289 let new_msg_id = rand::random::<u32>();
1290 let controller_name = self.inner.controller_name.clone();
1291 let delete_msg = delete_channel_message(
1292 &controller_name,
1293 &moderator_name,
1294 &channel_name,
1295 new_msg_id,
1296 &self.inner.auth_provider,
1297 )?;
1298
1299 debug!("Send delete session message: {:?}", delete_msg);
1300 if let Err(e) = self.send_control_message(delete_msg).await {
1301 error!(error = %e.chain(), "failed to send delete channel");
1302 success = false;
1303 } else {
1304 debug!(
1306 "create timer for message id: {} with type {:?}",
1307 new_msg_id,
1308 ProtoSessionMessageType::LeaveRequest
1309 );
1310 let timer =
1311 self.inner.timer_factory.read().as_ref().map(|factory| {
1312 factory.create_and_start_timer(
1313 new_msg_id,
1314 ProtoSessionMessageType::LeaveRequest,
1315 None,
1316 )
1317 });
1318
1319 self.inner
1320 .message_id_map
1321 .write()
1322 .insert(new_msg_id, (msg.message_id.clone(), timer));
1323 }
1324 }
1325 } else {
1326 error!("no moderators specified in delete channel request");
1327 success = false;
1328 };
1329
1330 if !success {
1331 let ack = Ack {
1332 original_message_id: msg.message_id.clone(),
1333 success,
1334 messages: vec![msg.message_id.clone()],
1335 };
1336
1337 let reply = ControlMessage {
1338 message_id: uuid::Uuid::new_v4().to_string(),
1339 payload: Some(Payload::Ack(ack)),
1340 };
1341
1342 if let Err(e) = tx.send(Ok(reply)).await {
1343 error!(error = %e.chain(), "failed to send ack");
1344 }
1345 }
1346 }
1347 Payload::AddParticipantRequest(req) => {
1348 info!(
1349 channel_name = %req.channel_name,
1350 participant_name = %req.participant_name,
1351 "received a participant add request",
1352 );
1353
1354 let mut success = true;
1355
1356 if let Some(first_moderator) = req.moderators.first() {
1357 let moderator_name = get_name_from_string(first_moderator)?;
1358 if !moderator_name.has_id() {
1359 error!("missing moderator ID");
1360 success = false;
1361 } else {
1362 let channel_name = get_name_from_string(&req.channel_name)?;
1363 let participant_name = get_name_from_string(&req.participant_name)?;
1364 let new_msg_id = rand::random::<u32>();
1365 let controller_name = self.inner.controller_name.clone();
1366 let invite_msg = invite_participant_message(
1367 &controller_name,
1368 &moderator_name,
1369 &participant_name,
1370 &channel_name,
1371 new_msg_id,
1372 &self.inner.auth_provider,
1373 )?;
1374
1375 debug!(?invite_msg, "Send invite participant");
1376
1377 if let Err(e) = self.send_control_message(invite_msg).await {
1378 error!(error = %e.chain(), "failed to send channel creation");
1379 success = false;
1380 } else {
1381 debug!(
1383 "create timer for message id: {} with type {:?}",
1384 new_msg_id,
1385 ProtoSessionMessageType::DiscoveryRequest
1386 );
1387 let timer =
1388 self.inner.timer_factory.read().as_ref().map(|factory| {
1389 factory.create_and_start_timer(
1390 new_msg_id,
1391 ProtoSessionMessageType::DiscoveryRequest,
1392 None,
1393 )
1394 });
1395 self.inner
1396 .message_id_map
1397 .write()
1398 .insert(new_msg_id, (msg.message_id.clone(), timer));
1399 }
1400 }
1401 } else {
1402 error!("no moderators specified in add participant request");
1403 };
1404
1405 if !success {
1406 let ack = Ack {
1407 original_message_id: msg.message_id.clone(),
1408 success,
1409 messages: vec![msg.message_id.clone()],
1410 };
1411
1412 let reply = ControlMessage {
1413 message_id: uuid::Uuid::new_v4().to_string(),
1414 payload: Some(Payload::Ack(ack)),
1415 };
1416
1417 if let Err(e) = tx.send(Ok(reply)).await {
1418 error!(error = %e.chain(), "failed to send ack");
1419 }
1420 }
1421 }
1422 Payload::DeleteParticipantRequest(req) => {
1423 info!("received a participant delete request");
1424
1425 let mut success = true;
1426
1427 if let Some(first_moderator) = req.moderators.first() {
1428 let moderator_name = get_name_from_string(first_moderator)?;
1429 if !moderator_name.has_id() {
1430 error!("missing moderator ID");
1431 success = false;
1432 } else {
1433 let channel_name = get_name_from_string(&req.channel_name)?;
1434 let participant_name = get_name_from_string(&req.participant_name)?;
1435 let new_msg_id = rand::random::<u32>();
1436 let controller_name = self.inner.controller_name.clone();
1437 let remove_msg = remove_participant_message(
1438 &controller_name,
1439 &moderator_name,
1440 &participant_name,
1441 &channel_name,
1442 new_msg_id,
1443 &self.inner.auth_provider,
1444 )?;
1445
1446 if let Err(e) = self.send_control_message(remove_msg).await {
1447 error!(error = %e.chain(), "failed to send delete participant request");
1448 success = false;
1449 } else {
1450 debug!(
1452 "create timer for message id: {} with type {:?}",
1453 new_msg_id,
1454 ProtoSessionMessageType::LeaveRequest
1455 );
1456 let timer =
1457 self.inner.timer_factory.read().as_ref().map(|factory| {
1458 factory.create_and_start_timer(
1459 new_msg_id,
1460 ProtoSessionMessageType::LeaveRequest,
1461 None,
1462 )
1463 });
1464 self.inner
1465 .message_id_map
1466 .write()
1467 .insert(new_msg_id, (msg.message_id.clone(), timer));
1468 }
1469 }
1470 } else {
1471 error!("no moderators specified in remove participant request");
1472 success = false;
1473 };
1474
1475 if !success {
1476 let ack = Ack {
1477 original_message_id: msg.message_id.clone(),
1478 success,
1479 messages: vec![msg.message_id.clone()],
1480 };
1481
1482 let reply = ControlMessage {
1483 message_id: uuid::Uuid::new_v4().to_string(),
1484 payload: Some(Payload::Ack(ack)),
1485 };
1486
1487 if let Err(e) = tx.send(Ok(reply)).await {
1488 error!(error = %e.chain(), "failed to send ack");
1489 }
1490 }
1491 }
1492 Payload::ListChannelRequest(_) => {}
1493 Payload::ListChannelResponse(_) => {}
1494 Payload::ListParticipantsRequest(_) => {}
1495 Payload::ListParticipantsResponse(_) => {}
1496 }
1497 }
1498 None => {
1499 error!(
1500 message_id = %msg.message_id,
1501 "received control message with no payload",
1502 );
1503 }
1504 }
1505
1506 Ok(())
1507 }
1508
1509 async fn handle_subscribe_message(&self, dst: Name, clients: &[ClientConfig]) {
1510 let mut sub_vec = vec![];
1511
1512 let components = dst.components_strings();
1513 let cmd = v1::Subscription {
1514 component_0: components[0].to_string(),
1515 component_1: components[1].to_string(),
1516 component_2: components[2].to_string(),
1517 id: Some(dst.id()),
1518 connection_id: "n/a".to_string(),
1519 node_id: None,
1520 link_id: None,
1521 direction: None,
1522 };
1523
1524 sub_vec.push(cmd);
1525
1526 let ctrl = ControlMessage {
1527 message_id: uuid::Uuid::new_v4().to_string(),
1528 payload: Some(Payload::ConfigCommand(v1::ConfigurationCommand {
1529 connections_to_create: vec![],
1530 connections_to_delete: vec![],
1531 subscriptions_to_set: sub_vec,
1532 subscriptions_to_delete: vec![],
1533 })),
1534 };
1535
1536 return self.send_or_queue_notification(ctrl, clients).await;
1537 }
1538
1539 async fn handle_unsubscribe_message(&self, dst: Name, clients: &[ClientConfig]) {
1540 let mut unsub_vec = vec![];
1541
1542 let components = dst.components_strings();
1543 let cmd = v1::Subscription {
1544 component_0: components[0].to_string(),
1545 component_1: components[1].to_string(),
1546 component_2: components[2].to_string(),
1547 id: Some(dst.id()),
1548 connection_id: "n/a".to_string(),
1549 node_id: None,
1550 link_id: None,
1551 direction: None,
1552 };
1553
1554 unsub_vec.push(cmd);
1555
1556 let ctrl = ControlMessage {
1557 message_id: uuid::Uuid::new_v4().to_string(),
1558 payload: Some(Payload::ConfigCommand(v1::ConfigurationCommand {
1559 connections_to_create: vec![],
1560 connections_to_delete: vec![],
1561 subscriptions_to_set: vec![],
1562 subscriptions_to_delete: unsub_vec,
1563 })),
1564 };
1565
1566 return self.send_or_queue_notification(ctrl, clients).await;
1567 }
1568
1569 async fn send_subscribe_message_with_ack(
1571 &self,
1572 mut msg: DataPlaneMessage,
1573 ) -> Result<u64, String> {
1574 let (ack_id, ack_rx) = self.inner.subscription_manager.register_ack();
1575 msg.set_subscription_id(ack_id);
1576
1577 if let Err(e) = self.send_control_message(msg).await {
1578 self.inner.subscription_manager.cancel_ack(ack_id);
1579 return Err(format!("datapath send error: {}", e.chain()));
1580 }
1581
1582 match ack_rx.await {
1583 Ok(Ok(())) => Ok(ack_id),
1584 Ok(Err(err)) => Err(err.to_string()),
1585 Err(_) => Err("subscription ack channel closed".to_string()),
1586 }
1587 }
1588
1589 async fn send_unsubscribe_message_with_ack(
1591 &self,
1592 mut msg: DataPlaneMessage,
1593 subscription_id: u64,
1594 ) -> Result<(), String> {
1595 let ack_rx = self
1596 .inner
1597 .subscription_manager
1598 .register_ack_with_id(subscription_id);
1599 msg.set_subscription_id(subscription_id);
1600
1601 if let Err(e) = self.send_control_message(msg).await {
1602 self.inner.subscription_manager.cancel_ack(subscription_id);
1603 return Err(format!("datapath send error: {}", e.chain()));
1604 }
1605
1606 match ack_rx.await {
1607 Ok(Ok(())) => Ok(()),
1608 Ok(Err(err)) => Err(err.to_string()),
1609 Err(_) => Err("subscription ack channel closed".to_string()),
1610 }
1611 }
1612
1613 async fn send_ack_message(&self, msg_id: u32, success: bool, clients: &[ClientConfig]) {
1616 let original_message_id = self.inner.message_id_map.write().remove(&msg_id);
1617 match original_message_id {
1618 Some(entry) => {
1619 debug!("Received GroupAck for message ID: {}", entry.0);
1620 if let Some(mut timer) = entry.1 {
1622 timer.stop();
1623 }
1624
1625 let ack = Ack {
1626 original_message_id: entry.0,
1627 success,
1628 messages: vec![msg_id.to_string()],
1629 };
1630
1631 let reply = ControlMessage {
1632 message_id: uuid::Uuid::new_v4().to_string(),
1633 payload: Some(Payload::Ack(ack)),
1634 };
1635
1636 self.send_or_queue_notification(reply, clients).await;
1637 }
1638 None => {
1639 debug!("Received GroupAck for unknown message ID: {}", msg_id);
1640 }
1641 }
1642 }
1643
1644 async fn send_control_message(&self, msg: DataPlaneMessage) -> Result<(), ControllerError> {
1646 self.inner.tx_slim.send(Ok(msg)).await.map_err(|e| {
1647 error!(error = %e.chain(), "error sending message into datapath");
1648 ControllerError::Datapath(slim_datapath::errors::DataPathError::ConnectionError)
1649 })
1650 }
1651
1652 async fn send_or_queue_notification(&self, ctrl_msg: ControlMessage, clients: &[ClientConfig]) {
1654 let mut has_active_connection = false;
1655
1656 for c in clients {
1658 let tx = match self.inner.tx_channels.read().get(&c.endpoint) {
1659 Some(tx) => tx.clone(),
1660 None => continue,
1661 };
1662
1663 if tx.send(Ok(ctrl_msg.clone())).await.is_ok() {
1664 has_active_connection = true;
1665 } else {
1666 debug!(
1667 endpoint = %c.endpoint,
1668 "failed to send notification to control plane"
1669 );
1670 }
1671 }
1672
1673 if !has_active_connection {
1675 debug!("no active control plane connections, queuing subscription notification");
1676
1677 let mut queue = self.inner.pending_notifications.lock();
1678 if queue.len() >= MAX_QUEUED_NOTIFICATIONS {
1679 queue.remove(0);
1681 debug!("queue full, removed oldest notification");
1682 }
1683 queue.push(ctrl_msg);
1684 }
1685 }
1686
1687 fn drain_watch(&self) -> Result<drain::Watch, ControllerError> {
1689 self.inner
1690 .drain_watch
1691 .read()
1692 .clone()
1693 .ok_or(ControllerError::AlreadyStopped)
1694 }
1695
1696 async fn send_queued_notifications(
1698 &self,
1699 tx: &mpsc::Sender<Result<ControlMessage, Status>>,
1700 endpoint: &str,
1701 ) {
1702 let notifications = {
1703 let mut queue = self.inner.pending_notifications.lock();
1704 if queue.is_empty() {
1705 return;
1706 }
1707 queue.drain(..).collect::<Vec<_>>()
1708 };
1709
1710 if notifications.is_empty() {
1711 return;
1712 }
1713
1714 debug!(
1715 "sending {} queued subscription notifications to {}",
1716 notifications.len(),
1717 endpoint
1718 );
1719
1720 let mut failed_notifications = Vec::new();
1721 for notification in notifications {
1722 if let Err(e) = tx.send(Ok(notification)).await {
1723 error!(
1724 error = %e.chain(),
1725 %endpoint,
1726 "failed to send queued notification to control plane",
1727 );
1728
1729 failed_notifications.push(e.0.unwrap());
1731 }
1732 }
1733
1734 if !failed_notifications.is_empty() {
1736 self.inner
1737 .pending_notifications
1738 .lock()
1739 .extend(failed_notifications);
1740 }
1741 }
1742
1743 fn process_control_message_stream(
1745 &self,
1746 config: Option<ClientConfig>,
1747 mut stream: impl Stream<Item = Result<ControlMessage, Status>> + Unpin + Send + 'static,
1748 mut timer_rx: Option<mpsc::Receiver<SessionMessage>>,
1749 tx: mpsc::Sender<Result<ControlMessage, Status>>,
1750 cancellation_token: CancellationToken,
1751 ) -> Result<JoinHandle<()>, ControllerError> {
1752 let this = self.clone();
1753 let watch = self.drain_watch()?;
1754 let clients = config.clone();
1755
1756 let handle = tokio::spawn(async move {
1757 let endpoint = config
1759 .as_ref()
1760 .map(|c| c.endpoint.clone())
1761 .unwrap_or_else(|| "unknown".to_string());
1762 info!(%endpoint, "connected to control plane");
1763
1764 let mut retry_connect = false;
1765
1766 let register_request = ControlMessage {
1767 message_id: uuid::Uuid::new_v4().to_string(),
1768 payload: Some(Payload::RegisterNodeRequest(v1::RegisterNodeRequest {
1769 node_id: this.inner.id.to_string(),
1770 group_name: this.inner.group_name.clone(),
1771 connection_details: this.inner.connection_details.clone(),
1772 })),
1773 };
1774
1775 if config.is_some()
1777 && let Err(e) = tx.send(Ok(register_request)).await
1778 {
1779 error!(error = %e.chain(), "failed to send register request");
1780 return;
1781 }
1782
1783 let mut drain_fut = std::pin::pin!(watch.clone().signaled());
1786
1787 loop {
1788 tokio::select! {
1789 next = stream.next() => {
1790 match next {
1791 Some(Ok(msg)) => {
1792 if let Err(e) = this.handle_new_control_message(msg, &tx).await {
1793 error!(error = %e.chain(), "error processing incoming control message");
1794 }
1795 }
1796 Some(Err(e)) => {
1797 if let Some(io_err) = Self::match_for_io_error(&e) {
1798 if io_err.kind() == std::io::ErrorKind::BrokenPipe {
1799 info!("connection closed by peer");
1800 } else {
1801 error!(
1803 error = %e.chain(),
1804 io_error_kind = ?io_err.kind(),
1805 "IO error receiving control messages"
1806 );
1807 }
1808 } else {
1809 error!(error = %e.chain(), "error receiving control messages");
1811 }
1812
1813 retry_connect = true;
1814 break;
1815 }
1816 None => {
1817 debug!("end of stream");
1818 retry_connect = true;
1819 break;
1820 }
1821 }
1822 }
1823 Some(session_msg) = async {
1824 match &mut timer_rx {
1825 Some(rx) => rx.recv().await,
1826 None => std::future::pending().await,
1827 }
1828 } => {
1829 match session_msg {
1830 SessionMessage::TimerFailure { message_id, message_type: _, name: _, timeouts: _} => {
1831 tracing::info!("got a failure for message id: {}", message_id);
1832 if let Some(clients) = &clients {
1834 this.send_ack_message(message_id, false, std::slice::from_ref(clients)).await;
1835 }
1836 }
1837 _ => {
1838 error!("unexpected session message received in controller");
1839 }
1840 }
1841 }
1842 _ = cancellation_token.cancelled() => {
1843 debug!("shutting down stream on cancellation token");
1844 break;
1845 }
1846 _ = &mut drain_fut => {
1847 debug!("shutting down stream on drain");
1848 break;
1849 }
1850 }
1851 }
1852
1853 info!(%endpoint, "control plane stream closed");
1854
1855 if retry_connect && let Some(config) = config {
1856 info!(%config.endpoint, "retrying connection to control plane");
1857 this.connect(config.clone(), cancellation_token)
1858 .await
1859 .map_or_else(
1860 |e| {
1861 error!(error = %e.chain(), "failed to reconnect to control plane");
1862 },
1863 |tx| {
1864 info!(%config.endpoint, "reconnected to control plane");
1865
1866 this.inner
1867 .tx_channels
1868 .write()
1869 .insert(config.endpoint.clone(), tx);
1870 },
1871 )
1872 }
1873 });
1874
1875 Ok(handle)
1876 }
1877
1878 async fn connect(
1882 &self,
1883 config: ClientConfig,
1884 cancellation_token: CancellationToken,
1885 ) -> Result<mpsc::Sender<Result<ControlMessage, Status>>, ControllerError> {
1886 info!(%config.endpoint, "connecting to control plane");
1887
1888 let channel = config.to_channel().await?;
1889
1890 let mut client = ControllerServiceClient::new(channel.clone());
1891 let (tx, rx) = mpsc::channel::<Result<ControlMessage, Status>>(128);
1892 let out_stream = ReceiverStream::new(rx).map(|res| res.expect("mapping error"));
1893 let stream = client
1894 .open_control_channel(Request::new(out_stream))
1895 .await?;
1896
1897 self.send_queued_notifications(&tx, &config.endpoint).await;
1898
1899 let timer_settings = TimerSettings::new(
1900 Duration::from_millis(2000),
1901 None,
1902 Some(0),
1903 TimerType::Constant,
1904 );
1905 let (timer_tx, timer_rx) = mpsc::channel::<SessionMessage>(128);
1906 let timer_factory = TimerFactory::new(timer_settings, timer_tx.clone());
1907 self.inner.timer_factory.write().replace(timer_factory);
1908
1909 self.process_control_message_stream(
1911 Some(config),
1912 stream.into_inner(),
1913 Some(timer_rx),
1914 tx.clone(),
1915 cancellation_token.clone(),
1916 )?;
1917
1918 Ok(tx)
1920 }
1921
1922 fn match_for_io_error(err_status: &Status) -> Option<&std::io::Error> {
1923 let mut err: &(dyn std::error::Error + 'static) = err_status;
1924
1925 loop {
1926 if let Some(io_err) = err.downcast_ref::<std::io::Error>() {
1927 return Some(io_err);
1928 }
1929
1930 if let Some(h2_err) = err.downcast_ref::<h2::Error>()
1933 && let Some(io_err) = h2_err.get_io()
1934 {
1935 return Some(io_err);
1936 }
1937
1938 err = err.source()?;
1939 }
1940 }
1941}
1942
1943#[tonic::async_trait]
1944impl GrpcControllerService for ControllerService {
1945 type OpenControlChannelStream =
1946 Pin<Box<dyn Stream<Item = Result<ControlMessage, Status>> + Send + 'static>>;
1947
1948 async fn open_control_channel(
1949 &self,
1950 request: Request<tonic::Streaming<ControlMessage>>,
1951 ) -> Result<Response<Self::OpenControlChannelStream>, Status> {
1952 let remote_endpoint = request
1954 .remote_addr()
1955 .map(|addr| addr.to_string())
1956 .unwrap_or_else(|| "unknown".to_string());
1957
1958 let stream = request.into_inner();
1959 let (tx, rx) = mpsc::channel::<Result<ControlMessage, Status>>(128);
1960
1961 let cancellation_token = CancellationToken::new();
1962
1963 self.process_control_message_stream(
1965 None,
1966 stream,
1967 None,
1968 tx.clone(),
1969 cancellation_token.clone(),
1970 )
1971 .map_err(|e| {
1972 error!(error = %e.chain(), "error processing control message stream");
1973 Status::unavailable("failed to process control message stream")
1974 })?;
1975
1976 self.inner
1978 .tx_channels
1979 .write()
1980 .insert(remote_endpoint.clone(), tx);
1981
1982 self.inner
1984 .cancellation_tokens
1985 .write()
1986 .insert(remote_endpoint.clone(), cancellation_token);
1987
1988 let out_stream = ReceiverStream::new(rx);
1989 Ok(Response::new(
1990 Box::pin(out_stream) as Self::OpenControlChannelStream
1991 ))
1992 }
1993}
1994
1995#[cfg(test)]
1996mod tests {
1997 use super::*;
1998 use slim_auth::shared_secret::SharedSecret;
1999 use slim_config::component::id::Kind;
2000 use slim_testing::utils::TEST_VALID_SECRET;
2001 use tracing_test::traced_test;
2002
2003 async fn setup_control_planes(
2005 server_endpoint: &str,
2006 server_name: &str,
2007 client_name: &str,
2008 ) -> (ControlPlane, ControlPlane, ClientConfig) {
2009 let id_server = ID::new_with_name(Kind::new("slim").unwrap(), server_name).unwrap();
2010 let id_client = ID::new_with_name(Kind::new("slim").unwrap(), client_name).unwrap();
2011
2012 let server_config = ServerConfig::with_endpoint(server_endpoint)
2013 .with_tls_settings(slim_config::tls::server::TlsServerConfig::insecure());
2014 let client_config = ClientConfig::with_endpoint(&format!("http://{}", server_endpoint))
2015 .with_tls_setting(slim_config::tls::client::TlsClientConfig::insecure());
2016
2017 let message_processor_server = MessageProcessor::new();
2018 let message_processor_client = MessageProcessor::new();
2019
2020 let control_plane_server = ControlPlane::new(ControlPlaneSettings {
2021 id: id_server,
2022 group_name: None,
2023 servers: vec![server_config.clone()],
2024 clients: vec![],
2025 message_processor: Arc::new(message_processor_server),
2026 auth_provider: Some(AuthProvider::SharedSecret(
2027 SharedSecret::new("server", TEST_VALID_SECRET).unwrap(),
2028 )),
2029 auth_verifier: Some(AuthVerifier::SharedSecret(
2030 SharedSecret::new("server", TEST_VALID_SECRET).unwrap(),
2031 )),
2032 connection_details: vec![from_server_config(&server_config)],
2033 });
2034
2035 let control_plane_client = ControlPlane::new(ControlPlaneSettings {
2036 id: id_client,
2037 group_name: None,
2038 servers: vec![],
2039 clients: vec![client_config.clone()],
2040 message_processor: Arc::new(message_processor_client),
2041 auth_provider: Some(AuthProvider::SharedSecret(
2042 SharedSecret::new("client", TEST_VALID_SECRET).unwrap(),
2043 )),
2044 auth_verifier: Some(AuthVerifier::SharedSecret(
2045 SharedSecret::new("client", TEST_VALID_SECRET).unwrap(),
2046 )),
2047 connection_details: vec![],
2048 });
2049
2050 (control_plane_server, control_plane_client, client_config)
2051 }
2052
2053 #[tokio::test]
2054 #[traced_test]
2055 async fn test_end_to_end() {
2056 let (mut control_plane_server, mut control_plane_client, _client_cfg) =
2057 setup_control_planes(
2058 "127.0.0.1:50051",
2059 "test-server-instance",
2060 "test-client-instance",
2061 )
2062 .await;
2063
2064 control_plane_server.run().await.unwrap();
2065 tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
2066 control_plane_client.run().await.unwrap();
2067 tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
2068
2069 assert!(logs_contain("received a register node request"));
2070 }
2071
2072 #[test]
2073 fn test_generate_session_id() {
2074 let moderator_a = Name::from_strings(["Org", "Ns", "Moderator"]).with_id(42);
2075 let moderator_b = Name::from_strings(["Org", "Ns", "Moderator"]).with_id(43); let channel_x = Name::from_strings(["Org", "Ns", "ChannelX"]).with_id(7);
2077 let channel_y = Name::from_strings(["Org", "Ns", "ChannelY"]).with_id(7); let id1 = generate_session_id(&moderator_a, &channel_x);
2080 let id2 = generate_session_id(&moderator_a, &channel_x);
2081 assert_eq!(id1, id2, "hash must be deterministic for same inputs");
2082
2083 let id3 = generate_session_id(&moderator_b, &channel_x);
2084 assert_ne!(id1, id3, "changing moderator id should change session id");
2085
2086 let id4 = generate_session_id(&moderator_a, &channel_y);
2087 assert_ne!(id1, id4, "changing channel name should change session id");
2088
2089 assert!(
2091 id1 != 0 && id3 != 0 && id4 != 0,
2092 "session ids should not be zero"
2093 );
2094 }
2095
2096 #[tokio::test]
2097 #[traced_test]
2098 async fn test_subscription_notification_queue_drain() {
2099 let (mut control_plane_server, mut control_plane_client, client_config) =
2101 setup_control_planes(
2102 "127.0.0.1:50061",
2103 "queue-drain-server",
2104 "queue-drain-client",
2105 )
2106 .await;
2107
2108 let controller = control_plane_client.controller.clone();
2109 assert_eq!(controller.inner.pending_notifications.lock().len(), 0);
2110
2111 const N: usize = 5;
2112 for i in 0..N {
2113 let ctrl_msg = ControlMessage {
2114 message_id: uuid::Uuid::new_v4().to_string(),
2115 payload: Some(Payload::ConfigCommand(v1::ConfigurationCommand {
2116 connections_to_create: vec![],
2117 connections_to_delete: vec![],
2118 subscriptions_to_set: vec![v1::Subscription {
2119 component_0: "queued".to_string(),
2120 component_1: "sub".to_string(),
2121 component_2: format!("name-{i}"),
2122 id: Some(i as u64),
2123 connection_id: "test-conn".to_string(),
2124 node_id: None,
2125 link_id: None,
2126 direction: None,
2127 }],
2128 subscriptions_to_delete: vec![],
2129 })),
2130 };
2131 controller
2132 .send_or_queue_notification(ctrl_msg, std::slice::from_ref(&client_config))
2133 .await;
2134 }
2135 assert_eq!(controller.inner.pending_notifications.lock().len(), N);
2136
2137 control_plane_server.run().await.expect("server run failed");
2138 tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
2139 control_plane_client.run().await.expect("client run failed");
2140 tokio::time::sleep(tokio::time::Duration::from_millis(200)).await;
2141
2142 assert_eq!(controller.inner.pending_notifications.lock().len(), 0);
2143 assert!(
2144 logs_contain(&format!("sending {} queued subscription notifications", N)),
2145 "Expected log about sending queued subscription notifications"
2146 );
2147
2148 drop(controller);
2149 drop(control_plane_server);
2150 drop(control_plane_client);
2151 }
2152
2153 #[tokio::test]
2154 #[traced_test]
2155 async fn test_delete_connection_by_link_id_success_ack() {
2156 let (mut control_plane_server, mut control_plane_client, _client_cfg) =
2157 setup_control_planes(
2158 "127.0.0.1:50081",
2159 "delete-linkid-server",
2160 "delete-linkid-client",
2161 )
2162 .await;
2163
2164 control_plane_server.run().await.unwrap();
2165 tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
2166 control_plane_client.run().await.unwrap();
2167 tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
2168
2169 let controller = control_plane_client.controller.clone();
2170 let link_id = "test-delete-link-id".to_string();
2171 let mut assigned = false;
2172 for _ in 0..50 {
2173 controller
2174 .inner
2175 .message_processor
2176 .connection_table()
2177 .for_each(|_, conn| {
2178 if !assigned {
2179 conn.set_link_id(link_id.clone());
2180 assigned = true;
2181 }
2182 });
2183 if assigned {
2184 break;
2185 }
2186 tokio::time::sleep(tokio::time::Duration::from_millis(20)).await;
2187 }
2188 assert!(
2189 assigned,
2190 "expected at least one connection to assign link_id"
2191 );
2192
2193 let ctrl_msg = ControlMessage {
2194 message_id: uuid::Uuid::new_v4().to_string(),
2195 payload: Some(Payload::ConfigCommand(v1::ConfigurationCommand {
2196 connections_to_create: vec![],
2197 connections_to_delete: vec![link_id.clone()],
2198 subscriptions_to_set: vec![],
2199 subscriptions_to_delete: vec![],
2200 })),
2201 };
2202 let (tx, mut rx) = mpsc::channel(1);
2203 controller
2204 .handle_new_control_message(ctrl_msg, &tx)
2205 .await
2206 .expect("config command must be handled");
2207
2208 let ack_msg = rx
2209 .recv()
2210 .await
2211 .expect("expected ack message")
2212 .expect("ack should be ok");
2213 let ack = match ack_msg.payload {
2214 Some(Payload::ConfigCommandAck(ack)) => ack,
2215 _ => panic!("expected ConfigCommandAck payload"),
2216 };
2217 assert_eq!(ack.connections_status.len(), 1);
2218 assert_eq!(ack.connections_status[0].connection_id, link_id);
2219 assert!(ack.connections_status[0].success);
2220 }
2221
2222 #[tokio::test]
2223 #[traced_test]
2224 async fn test_delete_connection_by_link_id_unknown_fails_ack() {
2225 let (control_plane_server, control_plane_client, _client_cfg) = setup_control_planes(
2226 "127.0.0.1:50082",
2227 "delete-linkid-server-unknown",
2228 "delete-linkid-client-unknown",
2229 )
2230 .await;
2231
2232 let controller = control_plane_client.controller.clone();
2233 let ctrl_msg = ControlMessage {
2234 message_id: uuid::Uuid::new_v4().to_string(),
2235 payload: Some(Payload::ConfigCommand(v1::ConfigurationCommand {
2236 connections_to_create: vec![],
2237 connections_to_delete: vec!["unknown-link-id".to_string()],
2238 subscriptions_to_set: vec![],
2239 subscriptions_to_delete: vec![],
2240 })),
2241 };
2242 let (tx, mut rx) = mpsc::channel(1);
2243 controller
2244 .handle_new_control_message(ctrl_msg, &tx)
2245 .await
2246 .expect("config command must be handled");
2247
2248 let ack_msg = rx
2249 .recv()
2250 .await
2251 .expect("expected ack message")
2252 .expect("ack should be ok");
2253 let ack = match ack_msg.payload {
2254 Some(Payload::ConfigCommandAck(ack)) => ack,
2255 _ => panic!("expected ConfigCommandAck payload"),
2256 };
2257 assert_eq!(ack.connections_status.len(), 1);
2258 assert_eq!(ack.connections_status[0].connection_id, "unknown-link-id");
2259 assert!(!ack.connections_status[0].success);
2260 assert!(ack.connections_status[0].error_msg.contains("not found"));
2261
2262 drop(control_plane_server);
2263 }
2264
2265 #[tokio::test]
2266 #[traced_test]
2267 async fn test_create_connection_with_existing_link_id_reuses_connection_ack() {
2268 let (mut control_plane_server, mut control_plane_client, _client_cfg) =
2269 setup_control_planes(
2270 "127.0.0.1:50083",
2271 "create-linkid-server",
2272 "create-linkid-client",
2273 )
2274 .await;
2275
2276 control_plane_server.run().await.unwrap();
2277 tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
2278 control_plane_client.run().await.unwrap();
2279 tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
2280
2281 let controller = control_plane_client.controller.clone();
2282 let link_id = "test-create-link-id".to_string();
2283 let mut assigned = false;
2284 for _ in 0..50 {
2285 controller
2286 .inner
2287 .message_processor
2288 .connection_table()
2289 .for_each(|_, conn| {
2290 if !assigned {
2291 conn.set_link_id(link_id.clone());
2292 assigned = true;
2293 }
2294 });
2295 if assigned {
2296 break;
2297 }
2298 tokio::time::sleep(tokio::time::Duration::from_millis(20)).await;
2299 }
2300 assert!(
2301 assigned,
2302 "expected at least one connection to assign link_id"
2303 );
2304
2305 let endpoint = "http://127.0.0.1:59999";
2306 let connection_config = serde_json::json!({
2307 "endpoint": endpoint,
2308 "link_id": link_id
2309 })
2310 .to_string();
2311
2312 let ctrl_msg = ControlMessage {
2313 message_id: uuid::Uuid::new_v4().to_string(),
2314 payload: Some(Payload::ConfigCommand(v1::ConfigurationCommand {
2315 connections_to_create: vec![v1::Connection {
2316 connection_id: "reuse-existing-link".to_string(),
2317 config_data: connection_config,
2318 }],
2319 connections_to_delete: vec![],
2320 subscriptions_to_set: vec![],
2321 subscriptions_to_delete: vec![],
2322 })),
2323 };
2324
2325 let (tx, mut rx) = mpsc::channel(1);
2326 controller
2327 .handle_new_control_message(ctrl_msg, &tx)
2328 .await
2329 .expect("config command must be handled");
2330
2331 let ack_msg = rx
2332 .recv()
2333 .await
2334 .expect("expected ack message")
2335 .expect("ack should be ok");
2336 let ack = match ack_msg.payload {
2337 Some(Payload::ConfigCommandAck(ack)) => ack,
2338 _ => panic!("expected ConfigCommandAck payload"),
2339 };
2340 assert_eq!(ack.connections_status.len(), 1);
2341 assert_eq!(
2342 ack.connections_status[0].connection_id,
2343 "reuse-existing-link"
2344 );
2345 assert!(ack.connections_status[0].success);
2346
2347 assert!(
2348 controller.inner.connections.read().contains_key(endpoint),
2349 "expected endpoint to be mapped to reused connection id"
2350 );
2351 }
2352
2353 #[tokio::test]
2354 #[traced_test]
2355 async fn test_subscription_set_unknown_link_id_fails_ack() {
2356 let (control_plane_server, control_plane_client, _client_cfg) = setup_control_planes(
2357 "127.0.0.1:50084",
2358 "sub-linkid-server-unknown",
2359 "sub-linkid-client-unknown",
2360 )
2361 .await;
2362
2363 let controller = control_plane_client.controller.clone();
2364 let ctrl_msg = ControlMessage {
2365 message_id: uuid::Uuid::new_v4().to_string(),
2366 payload: Some(Payload::ConfigCommand(v1::ConfigurationCommand {
2367 connections_to_create: vec![],
2368 connections_to_delete: vec![],
2369 subscriptions_to_set: vec![v1::Subscription {
2370 component_0: "org".to_string(),
2371 component_1: "ns".to_string(),
2372 component_2: "agent".to_string(),
2373 id: Some(1),
2374 connection_id: String::new(),
2375 node_id: None,
2376 link_id: Some("missing-link-id".to_string()),
2377 direction: None,
2378 }],
2379 subscriptions_to_delete: vec![],
2380 })),
2381 };
2382 let (tx, mut rx) = mpsc::channel(1);
2383 controller
2384 .handle_new_control_message(ctrl_msg, &tx)
2385 .await
2386 .expect("config command must be handled");
2387
2388 let ack_msg = rx
2389 .recv()
2390 .await
2391 .expect("expected ack message")
2392 .expect("ack should be ok");
2393 let ack = match ack_msg.payload {
2394 Some(Payload::ConfigCommandAck(ack)) => ack,
2395 _ => panic!("expected ConfigCommandAck payload"),
2396 };
2397
2398 assert_eq!(ack.subscriptions_status.len(), 1);
2399 assert!(!ack.subscriptions_status[0].success);
2400 assert!(
2401 ack.subscriptions_status[0]
2402 .error_msg
2403 .contains("Connection with link_id missing-link-id not found")
2404 );
2405
2406 drop(control_plane_server);
2407 }
2408
2409 #[tokio::test]
2410 #[traced_test]
2411 async fn test_create_connection_invalid_config_fails_ack() {
2412 let (_control_plane_server, control_plane_client, _client_cfg) = setup_control_planes(
2413 "127.0.0.1:50085",
2414 "create-invalid-config-server",
2415 "create-invalid-config-client",
2416 )
2417 .await;
2418
2419 let controller = control_plane_client.controller.clone();
2420 let ctrl_msg = ControlMessage {
2421 message_id: uuid::Uuid::new_v4().to_string(),
2422 payload: Some(Payload::ConfigCommand(v1::ConfigurationCommand {
2423 connections_to_create: vec![v1::Connection {
2424 connection_id: "invalid-config-conn".to_string(),
2425 config_data: "{invalid-json".to_string(),
2426 }],
2427 connections_to_delete: vec![],
2428 subscriptions_to_set: vec![],
2429 subscriptions_to_delete: vec![],
2430 })),
2431 };
2432 let (tx, mut rx) = mpsc::channel(1);
2433 controller
2434 .handle_new_control_message(ctrl_msg, &tx)
2435 .await
2436 .expect("config command must be handled");
2437
2438 let ack_msg = rx
2439 .recv()
2440 .await
2441 .expect("expected ack message")
2442 .expect("ack should be ok");
2443 let ack = match ack_msg.payload {
2444 Some(Payload::ConfigCommandAck(ack)) => ack,
2445 _ => panic!("expected ConfigCommandAck payload"),
2446 };
2447 assert_eq!(ack.connections_status.len(), 1);
2448 assert!(!ack.connections_status[0].success);
2449 assert!(
2450 ack.connections_status[0]
2451 .error_msg
2452 .contains("Failed to parse config")
2453 );
2454 }
2455
2456 #[tokio::test]
2457 #[traced_test]
2458 async fn test_subscription_delete_unknown_link_id_fails_ack() {
2459 let (control_plane_server, control_plane_client, _client_cfg) = setup_control_planes(
2460 "127.0.0.1:50086",
2461 "sub-del-linkid-server-unknown",
2462 "sub-del-linkid-client-unknown",
2463 )
2464 .await;
2465
2466 let controller = control_plane_client.controller.clone();
2467 let ctrl_msg = ControlMessage {
2468 message_id: uuid::Uuid::new_v4().to_string(),
2469 payload: Some(Payload::ConfigCommand(v1::ConfigurationCommand {
2470 connections_to_create: vec![],
2471 connections_to_delete: vec![],
2472 subscriptions_to_set: vec![],
2473 subscriptions_to_delete: vec![v1::Subscription {
2474 component_0: "org".to_string(),
2475 component_1: "ns".to_string(),
2476 component_2: "agent".to_string(),
2477 id: Some(1),
2478 connection_id: String::new(),
2479 node_id: None,
2480 link_id: Some("missing-link-id-delete".to_string()),
2481 direction: None,
2482 }],
2483 })),
2484 };
2485 let (tx, mut rx) = mpsc::channel(1);
2486 controller
2487 .handle_new_control_message(ctrl_msg, &tx)
2488 .await
2489 .expect("config command must be handled");
2490
2491 let ack_msg = rx
2492 .recv()
2493 .await
2494 .expect("expected ack message")
2495 .expect("ack should be ok");
2496 let ack = match ack_msg.payload {
2497 Some(Payload::ConfigCommandAck(ack)) => ack,
2498 _ => panic!("expected ConfigCommandAck payload"),
2499 };
2500
2501 assert_eq!(ack.subscriptions_status.len(), 1);
2502 assert!(!ack.subscriptions_status[0].success);
2503 assert!(
2504 ack.subscriptions_status[0]
2505 .error_msg
2506 .contains("Connection with link_id missing-link-id-delete not found")
2507 );
2508
2509 drop(control_plane_server);
2510 }
2511
2512 #[tokio::test]
2513 #[traced_test]
2514 async fn test_shutdown_drains_resources() {
2515 let (mut control_plane_server, mut control_plane_client, _client_cfg) =
2517 setup_control_planes(
2518 "127.0.0.1:50071",
2519 "shutdown-server-instance",
2520 "shutdown-client-instance",
2521 )
2522 .await;
2523
2524 control_plane_server
2526 .run()
2527 .await
2528 .expect("server should start");
2529 tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
2530 control_plane_client
2531 .run()
2532 .await
2533 .expect("client should start");
2534 tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
2535
2536 let server_tokens_before = control_plane_server
2538 .controller
2539 .inner
2540 .cancellation_tokens
2541 .read()
2542 .len();
2543 assert!(
2544 server_tokens_before > 0,
2545 "expected server to have active cancellation tokens before shutdown"
2546 );
2547
2548 let client_tokens_before = control_plane_client
2549 .controller
2550 .inner
2551 .cancellation_tokens
2552 .read()
2553 .len();
2554 assert!(
2555 client_tokens_before > 0,
2556 "expected client to have active cancellation tokens before shutdown"
2557 );
2558
2559 control_plane_client
2561 .shutdown()
2562 .await
2563 .expect("client shutdown ok");
2564 control_plane_server
2565 .shutdown()
2566 .await
2567 .expect("server shutdown ok");
2568
2569 let server_tokens_after = control_plane_server
2571 .controller
2572 .inner
2573 .cancellation_tokens
2574 .read()
2575 .len();
2576 assert_eq!(
2577 server_tokens_after, 0,
2578 "expected server cancellation tokens to be drained after shutdown"
2579 );
2580
2581 let client_tokens_after = control_plane_client
2582 .controller
2583 .inner
2584 .cancellation_tokens
2585 .read()
2586 .len();
2587 assert_eq!(
2588 client_tokens_after, 0,
2589 "expected client cancellation tokens to be drained after shutdown"
2590 );
2591
2592 assert!(
2594 control_plane_server.shutdown().await.is_err(),
2595 "second shutdown on server should return an error"
2596 );
2597 assert!(
2598 control_plane_client.shutdown().await.is_err(),
2599 "second shutdown on client should return an error"
2600 );
2601 }
2602
2603 #[tokio::test]
2604 #[traced_test]
2605 async fn test_shutdown_without_run() {
2606 let (control_plane_server, mut _control_plane_client, _client_cfg) = setup_control_planes(
2608 "127.0.0.1:50072",
2609 "shutdown-no-run-server",
2610 "shutdown-no-run-client",
2611 )
2612 .await;
2613
2614 assert_eq!(
2616 control_plane_server
2617 .controller
2618 .inner
2619 .cancellation_tokens
2620 .read()
2621 .len(),
2622 0,
2623 "expected zero cancellation tokens before shutdown when not run"
2624 );
2625
2626 control_plane_server
2628 .shutdown()
2629 .await
2630 .expect("shutdown without prior run should succeed");
2631
2632 assert_eq!(
2634 control_plane_server
2635 .controller
2636 .inner
2637 .cancellation_tokens
2638 .read()
2639 .len(),
2640 0,
2641 "expected zero cancellation tokens after shutdown when not run"
2642 );
2643
2644 assert!(
2646 control_plane_server.shutdown().await.is_err(),
2647 "second shutdown should error due to missing drain signal"
2648 );
2649 }
2650}