slim_controller/
service.rs

1// Copyright AGNTCY Contributors (https://github.com/agntcy)
2// SPDX-License-Identifier: Apache-2.0
3
4use std::collections::HashMap;
5use std::pin::Pin;
6use std::sync::Arc;
7
8use slim_config::component::id::ID;
9use slim_config::grpc::server::ServerConfig;
10use tokio::sync::mpsc;
11use tokio_stream::{Stream, StreamExt, wrappers::ReceiverStream};
12use tokio_util::sync::CancellationToken;
13use tonic::{Request, Response, Status};
14use tracing::{debug, error, info};
15
16use crate::api::proto::api::v1::control_message::Payload;
17use crate::api::proto::api::v1::controller_service_server::ControllerServiceServer;
18use crate::api::proto::api::v1::{
19    self, ConnectionListResponse, ConnectionType, SubscriptionListResponse,
20};
21use crate::api::proto::api::v1::{
22    Ack, ConnectionEntry, ControlMessage, SubscriptionEntry,
23    controller_service_client::ControllerServiceClient,
24    controller_service_server::ControllerService as GrpcControllerService,
25};
26use crate::errors::ControllerError;
27use slim_config::grpc::client::ClientConfig;
28use slim_datapath::api::ProtoMessage as PubsubMessage;
29use slim_datapath::message_processing::MessageProcessor;
30use slim_datapath::messages::Name;
31use slim_datapath::messages::utils::SlimHeaderFlags;
32use slim_datapath::tables::SubscriptionTable;
33
34type TxChannel = mpsc::Sender<Result<ControlMessage, Status>>;
35type TxChannels = HashMap<String, TxChannel>;
36
37/// Inner structure for the controller service
38/// This structure holds the internal state of the controller service,
39/// including the ID, message processor, connections, and channels.
40/// It is normally wrapped in an Arc to allow shared ownership across multiple threads.
41#[derive(Debug)]
42struct ControllerServiceInternal {
43    /// ID of this SLIM instance
44    id: ID,
45
46    /// underlying message processor
47    message_processor: Arc<MessageProcessor>,
48
49    /// map of connection IDs to their configuration
50    connections: Arc<parking_lot::RwLock<HashMap<String, u64>>>,
51
52    /// channel to send messages into the datapath
53    tx_slim: mpsc::Sender<Result<PubsubMessage, Status>>,
54
55    /// channel to receive messages from the datapath
56    _rx_slim: mpsc::Receiver<Result<PubsubMessage, Status>>,
57
58    /// channels to send control messages
59    tx_channels: parking_lot::RwLock<TxChannels>,
60
61    /// cancellation token for graceful shutdown
62    cancellation_tokens: parking_lot::RwLock<HashMap<String, CancellationToken>>,
63
64    /// drain watch channel
65    drain_rx: drain::Watch,
66}
67
68#[derive(Debug, Clone)]
69struct ControllerService {
70    /// internal service state
71    inner: Arc<ControllerServiceInternal>,
72}
73
74/// The ControlPlane service is the main entry point for the controller service.
75#[derive(Debug)]
76pub struct ControlPlane {
77    /// servers
78    servers: Vec<ServerConfig>,
79
80    /// clients
81    clients: Vec<ClientConfig>,
82
83    /// controller
84    controller: ControllerService,
85}
86
87/// ControllerServiceInternal implements Drop trait to cancel all running listeners and
88/// clean up resources.
89impl Drop for ControlPlane {
90    fn drop(&mut self) {
91        // cancel all running listeners
92        for (_endpoint, token) in self.controller.inner.cancellation_tokens.write().drain() {
93            token.cancel();
94        }
95    }
96}
97
98/// ControlPlane implements the service trait for the controller service.
99impl ControlPlane {
100    /// Create a new ControlPlane service instance
101    /// This function initializes the ControlPlane with the given ID, servers, clients, and message processor.
102    /// It also sets up the internal state, including the connections and channels.
103    /// # Arguments
104    /// * `id` - The ID of the SLIM instance.
105    /// * `servers` - A vector of server configurations.
106    /// * `clients` - A vector of client configurations.
107    /// * `drain_rx` - A drain watch channel for graceful shutdown.
108    /// * `message_processor` - An Arc to the message processor instance.
109    /// # Returns
110    /// A new instance of ControlPlane.
111    pub fn new(
112        id: ID,
113        servers: Vec<ServerConfig>,
114        clients: Vec<ClientConfig>,
115        drain_rx: drain::Watch,
116        message_processor: Arc<MessageProcessor>,
117    ) -> Self {
118        // create local connection with the message processor
119        let (_, tx_slim, rx_slim) = message_processor.register_local_connection();
120
121        ControlPlane {
122            servers,
123            clients,
124            controller: ControllerService {
125                inner: Arc::new(ControllerServiceInternal {
126                    id,
127                    message_processor,
128                    connections: Arc::new(parking_lot::RwLock::new(HashMap::new())),
129                    tx_slim,
130                    _rx_slim: rx_slim,
131                    tx_channels: parking_lot::RwLock::new(HashMap::new()),
132                    cancellation_tokens: parking_lot::RwLock::new(HashMap::new()),
133                    drain_rx,
134                }),
135            },
136        }
137    }
138
139    /// Take an existing ControlPlane instance and return a new one with the provided clients.
140    pub fn with_clients(mut self, clients: Vec<ClientConfig>) -> Self {
141        self.clients = clients;
142        self
143    }
144
145    /// Take an existing ControlPlane instance and return a new one with the provided servers.
146    pub fn with_servers(mut self, servers: Vec<ServerConfig>) -> Self {
147        self.servers = servers;
148        self
149    }
150
151    /// Run the clients and servers of the ControlPlane service.
152    /// This function starts all the servers and clients defined in the ControlPlane.
153    /// # Returns
154    /// A Result indicating success or failure of the operation.
155    /// # Errors
156    /// If there is an error starting any of the servers or clients, it will return a ControllerError.
157    pub async fn run(&mut self) -> Result<(), ControllerError> {
158        info!("starting controller service");
159
160        // Collect servers to avoid borrowing self both mutably and immutably
161        let servers = self.servers.clone();
162        let clients = self.clients.clone();
163
164        // run all servers
165        for server in servers {
166            self.run_server(server)?;
167        }
168
169        // run all clients
170        for client in clients {
171            self.run_client(client).await?;
172        }
173
174        Ok(())
175    }
176
177    /// Stop the ControlPlane service.
178    /// This function stops all running listeners and cancels any ongoing operations.
179    /// It cleans up the internal state and ensures that all resources are released properly.
180    pub fn stop(&mut self) {
181        info!("stopping controller service");
182
183        // cancel all running listeners
184        for (endpoint, token) in self.controller.inner.cancellation_tokens.write().drain() {
185            info!(%endpoint, "stopping");
186            token.cancel();
187        }
188    }
189
190    /// Run a client configuration.
191    /// This function connects to the control plane using the provided client configuration.
192    /// It checks if the client is already running and if not, it starts a new connection.
193    async fn run_client(&mut self, client: ClientConfig) -> Result<(), ControllerError> {
194        if self
195            .controller
196            .inner
197            .cancellation_tokens
198            .read()
199            .contains_key(&client.endpoint)
200        {
201            return Err(ControllerError::ConfigError(format!(
202                "client {} is already running",
203                client.endpoint
204            )));
205        }
206
207        let cancellation_token = CancellationToken::new();
208
209        let tx = self
210            .controller
211            .connect(client.clone(), cancellation_token.clone())
212            .await?;
213
214        // Store the cancellation token in the controller service
215        self.controller
216            .inner
217            .cancellation_tokens
218            .write()
219            .insert(client.endpoint.clone(), cancellation_token);
220
221        // Store the sender in the tx_channels map
222        self.controller
223            .inner
224            .tx_channels
225            .write()
226            .insert(client.endpoint.clone(), tx);
227
228        // return the sender for control messages
229        Ok(())
230    }
231
232    /// Run a server configuration.
233    /// This function starts a server using the provided server configuration.
234    /// It checks if the server is already running and if not, it starts a new server.
235    pub fn run_server(&mut self, config: ServerConfig) -> Result<(), ControllerError> {
236        info!(%config.endpoint, "starting control plane server");
237
238        // Check if the server is already running
239        if self
240            .controller
241            .inner
242            .cancellation_tokens
243            .read()
244            .contains_key(&config.endpoint)
245        {
246            error!("server {} is already running", config.endpoint);
247            return Err(ControllerError::ConfigError(format!(
248                "server {} is already running",
249                config.endpoint
250            )));
251        }
252
253        let token = config
254            .run_server(
255                &[ControllerServiceServer::new(self.controller.clone())],
256                self.controller.inner.drain_rx.clone(),
257            )
258            .map_err(|e| {
259                error!("failed to run server {}: {}", config.endpoint, e);
260                ControllerError::ConfigError(e.to_string())
261            })?;
262
263        // Store the cancellation token in the controller service
264        self.controller
265            .inner
266            .cancellation_tokens
267            .write()
268            .insert(config.endpoint.clone(), token.clone());
269
270        info!(%config.endpoint, "control plane server started");
271
272        Ok(())
273    }
274}
275
276impl ControllerService {
277    const MAX_RETRIES: i32 = 10;
278
279    /// Handle new control messages.
280    async fn handle_new_control_message(
281        &self,
282        msg: ControlMessage,
283        tx: &mpsc::Sender<Result<ControlMessage, Status>>,
284    ) -> Result<(), ControllerError> {
285        match msg.payload {
286            Some(ref payload) => {
287                match payload {
288                    Payload::ConfigCommand(config) => {
289                        for conn in &config.connections_to_create {
290                            info!("received a connection to create: {:?}", conn);
291                            let client_config =
292                                serde_json::from_str::<ClientConfig>(&conn.config_data)
293                                    .map_err(|e| ControllerError::ConfigError(e.to_string()))?;
294                            let client_endpoint = &client_config.endpoint;
295
296                            // connect to an endpoint if it's not already connected
297                            if !self.inner.connections.read().contains_key(client_endpoint) {
298                                match client_config.to_channel() {
299                                    Err(e) => {
300                                        error!("error reading channel config {:?}", e);
301                                    }
302                                    Ok(channel) => {
303                                        let ret = self
304                                            .inner
305                                            .message_processor
306                                            .connect(
307                                                channel,
308                                                Some(client_config.clone()),
309                                                None,
310                                                None,
311                                            )
312                                            .await
313                                            .map_err(|e| {
314                                                ControllerError::ConnectionError(e.to_string())
315                                            });
316
317                                        let conn_id = match ret {
318                                            Err(e) => {
319                                                error!("connection error: {:?}", e);
320                                                return Err(ControllerError::ConnectionError(
321                                                    e.to_string(),
322                                                ));
323                                            }
324                                            Ok(conn_id) => conn_id.1,
325                                        };
326
327                                        self.inner
328                                            .connections
329                                            .write()
330                                            .insert(client_endpoint.clone(), conn_id);
331                                    }
332                                }
333                            }
334                        }
335
336                        for subscription in &config.subscriptions_to_set {
337                            if !self
338                                .inner
339                                .connections
340                                .read()
341                                .contains_key(&subscription.connection_id)
342                            {
343                                error!("connection {} not found", subscription.connection_id);
344                                continue;
345                            }
346
347                            let conn = self
348                                .inner
349                                .connections
350                                .read()
351                                .get(&subscription.connection_id)
352                                .cloned()
353                                .unwrap();
354                            let source = Name::from_strings([
355                                subscription.component_0.as_str(),
356                                subscription.component_1.as_str(),
357                                subscription.component_2.as_str(),
358                            ])
359                            .with_id(0);
360                            let name = Name::from_strings([
361                                subscription.component_0.as_str(),
362                                subscription.component_1.as_str(),
363                                subscription.component_2.as_str(),
364                            ])
365                            .with_id(subscription.id.unwrap_or(Name::NULL_COMPONENT));
366
367                            let msg = PubsubMessage::new_subscribe(
368                                &source,
369                                &name,
370                                Some(SlimHeaderFlags::default().with_recv_from(conn)),
371                            );
372
373                            if let Err(e) = self.send_control_message(msg).await {
374                                error!("failed to subscribe: {}", e);
375                            }
376                        }
377
378                        for subscription in &config.subscriptions_to_delete {
379                            if !self
380                                .inner
381                                .connections
382                                .read()
383                                .contains_key(&subscription.connection_id)
384                            {
385                                error!("connection {} not found", subscription.connection_id);
386                                continue;
387                            }
388
389                            let conn = self
390                                .inner
391                                .connections
392                                .read()
393                                .get(&subscription.connection_id)
394                                .cloned()
395                                .unwrap();
396                            let source = Name::from_strings([
397                                subscription.component_0.as_str(),
398                                subscription.component_1.as_str(),
399                                subscription.component_2.as_str(),
400                            ])
401                            .with_id(0);
402                            let name = Name::from_strings([
403                                subscription.component_0.as_str(),
404                                subscription.component_1.as_str(),
405                                subscription.component_2.as_str(),
406                            ])
407                            .with_id(subscription.id.unwrap_or(Name::NULL_COMPONENT));
408
409                            let msg = PubsubMessage::new_unsubscribe(
410                                &source,
411                                &name,
412                                Some(SlimHeaderFlags::default().with_recv_from(conn)),
413                            );
414
415                            if let Err(e) = self.send_control_message(msg).await {
416                                error!("failed to unsubscribe: {}", e);
417                            }
418                        }
419
420                        let ack = Ack {
421                            original_message_id: msg.message_id.clone(),
422                            success: true,
423                            messages: vec![],
424                        };
425
426                        let reply = ControlMessage {
427                            message_id: uuid::Uuid::new_v4().to_string(),
428                            payload: Some(Payload::Ack(ack)),
429                        };
430
431                        if let Err(e) = tx.send(Ok(reply)).await {
432                            error!("failed to send ACK: {}", e);
433                        }
434                    }
435                    Payload::SubscriptionListRequest(_) => {
436                        const CHUNK_SIZE: usize = 100;
437
438                        let conn_table = self.inner.message_processor.connection_table();
439                        let mut entries = Vec::new();
440
441                        self.inner.message_processor.subscription_table().for_each(
442                            |name, id, local, remote| {
443                                let mut entry = SubscriptionEntry {
444                                    component_0: name.components_strings().unwrap()[0].to_string(),
445                                    component_1: name.components_strings().unwrap()[1].to_string(),
446                                    component_2: name.components_strings().unwrap()[2].to_string(),
447                                    id: Some(id),
448                                    ..Default::default()
449                                };
450
451                                for &cid in local {
452                                    entry.local_connections.push(ConnectionEntry {
453                                        id: cid,
454                                        connection_type: ConnectionType::Local as i32,
455                                        config_data: "{}".to_string(),
456                                    });
457                                }
458
459                                for &cid in remote {
460                                    if let Some(conn) = conn_table.get(cid as usize) {
461                                        entry.remote_connections.push(ConnectionEntry {
462                                            id: cid,
463                                            connection_type: ConnectionType::Remote as i32,
464                                            config_data: match conn.config_data() {
465                                                Some(data) => serde_json::to_string(data)
466                                                    .unwrap_or_else(|_| "{}".to_string()),
467                                                None => "{}".to_string(),
468                                            },
469                                        });
470                                    } else {
471                                        error!("no connection entry for id {}", cid);
472                                    }
473                                }
474                                entries.push(entry);
475                            },
476                        );
477
478                        for chunk in entries.chunks(CHUNK_SIZE) {
479                            let resp = ControlMessage {
480                                message_id: uuid::Uuid::new_v4().to_string(),
481                                payload: Some(Payload::SubscriptionListResponse(
482                                    SubscriptionListResponse {
483                                        entries: chunk.to_vec(),
484                                    },
485                                )),
486                            };
487
488                            if let Err(e) = tx.try_send(Ok(resp)) {
489                                error!("failed to send subscription batch: {}", e);
490                            }
491                        }
492                    }
493                    Payload::ConnectionListRequest(_) => {
494                        let mut all_entries = Vec::new();
495                        self.inner
496                            .message_processor
497                            .connection_table()
498                            .for_each(|id, conn| {
499                                all_entries.push(ConnectionEntry {
500                                    id: id as u64,
501                                    connection_type: ConnectionType::Remote as i32,
502                                    config_data: match conn.config_data() {
503                                        Some(data) => serde_json::to_string(data)
504                                            .unwrap_or_else(|_| "{}".to_string()),
505                                        None => "{}".to_string(),
506                                    },
507                                });
508                            });
509
510                        const CHUNK_SIZE: usize = 100;
511                        for chunk in all_entries.chunks(CHUNK_SIZE) {
512                            let resp = ControlMessage {
513                                message_id: uuid::Uuid::new_v4().to_string(),
514                                payload: Some(Payload::ConnectionListResponse(
515                                    ConnectionListResponse {
516                                        entries: chunk.to_vec(),
517                                    },
518                                )),
519                            };
520
521                            if let Err(e) = tx.try_send(Ok(resp)) {
522                                error!("failed to send connection list batch: {}", e);
523                            }
524                        }
525                    }
526                    Payload::Ack(_ack) => {
527                        // received an ack, do nothing - this should not happen
528                    }
529                    Payload::SubscriptionListResponse(_) => {
530                        // received a subscription list response, do nothing - this should not happen
531                    }
532                    Payload::ConnectionListResponse(_) => {
533                        // received a connection list response, do nothing - this should not happen
534                    }
535                    Payload::RegisterNodeRequest(_) => {
536                        error!("received a register node request, this should not happen");
537                    }
538                    Payload::RegisterNodeResponse(_) => {
539                        // received a register node response, do nothing
540                    }
541                    Payload::DeregisterNodeRequest(_) => {
542                        error!("received a deregister node request, this should not happen");
543                    }
544                    Payload::DeregisterNodeResponse(_) => {
545                        // received a deregister node response, do nothing
546                    }
547                }
548            }
549            None => {
550                error!(
551                    "received control message {} with no payload",
552                    msg.message_id
553                );
554            }
555        }
556
557        Ok(())
558    }
559
560    /// Send a control message to SLIM.
561    async fn send_control_message(&self, msg: PubsubMessage) -> Result<(), ControllerError> {
562        self.inner.tx_slim.send(Ok(msg)).await.map_err(|e| {
563            error!("error sending message into datapath: {}", e);
564            ControllerError::DatapathError(e.to_string())
565        })
566    }
567
568    /// Process the control message stream.
569    fn process_control_message_stream(
570        &self,
571        config: Option<ClientConfig>,
572        mut stream: impl Stream<Item = Result<ControlMessage, Status>> + Unpin + Send + 'static,
573        tx: mpsc::Sender<Result<ControlMessage, Status>>,
574        cancellation_token: CancellationToken,
575    ) -> tokio::task::JoinHandle<()> {
576        let this = self.clone();
577        let drain = this.inner.drain_rx.clone();
578        tokio::spawn(async move {
579            // Send a register message to the control plane
580            let endpoint = config
581                .as_ref()
582                .map(|c| c.endpoint.clone())
583                .unwrap_or_else(|| "unknown".to_string());
584            info!(%endpoint, "connected to control plane");
585
586            let mut retry_connect = false;
587
588            let register_request = ControlMessage {
589                message_id: uuid::Uuid::new_v4().to_string(),
590                payload: Some(Payload::RegisterNodeRequest(v1::RegisterNodeRequest {
591                    node_id: this.inner.id.to_string(),
592                })),
593            };
594
595            // send register request if client
596            if config.is_some() {
597                if let Err(e) = tx.send(Ok(register_request)).await {
598                    error!("failed to send register request: {}", e);
599                    return;
600                }
601            }
602
603            // TODO; here we should wait for an ack
604
605            loop {
606                tokio::select! {
607                    next = stream.next() => {
608                        match next {
609                            Some(Ok(msg)) => {
610                                if let Err(e) = this.handle_new_control_message(msg, &tx).await {
611                                    error!("error processing incoming control message: {:?}", e);
612                                }
613                            }
614                            Some(Err(e)) => {
615                                if let Some(io_err) = Self::match_for_io_error(&e) {
616                                    if io_err.kind() == std::io::ErrorKind::BrokenPipe {
617                                        info!("connection closed by peer");
618                                        retry_connect = true;
619                                    }
620                                } else {
621                                    error!(%e, "error receiving control messages");
622                                }
623
624                                break;
625                            }
626                            None => {
627                                debug!("end of stream");
628                                retry_connect = true;
629                                break;
630                            }
631                        }
632                    }
633                    _ = cancellation_token.cancelled() => {
634                        debug!("shutting down stream on cancellation token");
635                        break;
636                    }
637                    _ = drain.clone().signaled() => {
638                        debug!("shutting down stream on drain");
639                        break;
640                    }
641                }
642            }
643
644            info!(%endpoint, "control plane stream closed");
645
646            if retry_connect {
647                if let Some(config) = config {
648                    info!(%config.endpoint, "retrying connection to control plane");
649                    this.connect(config.clone(), cancellation_token)
650                        .await
651                        .map_or_else(
652                            |e| {
653                                error!("failed to reconnect to control plane: {}", e);
654                            },
655                            |tx| {
656                                info!(%config.endpoint, "reconnected to control plane");
657
658                                this.inner
659                                    .tx_channels
660                                    .write()
661                                    .insert(config.endpoint.clone(), tx);
662                            },
663                        )
664                }
665            }
666        })
667    }
668
669    /// Connect to the control plane using the provided client configuration.
670    /// This function attempts to establish a connection to the control plane and returns a sender for control messages.
671    /// It retries the connection a specified number of times if it fails.
672    async fn connect(
673        &self,
674        config: ClientConfig,
675        cancellation_token: CancellationToken,
676    ) -> Result<mpsc::Sender<Result<ControlMessage, Status>>, ControllerError> {
677        info!(%config.endpoint, "connecting to control plane");
678
679        let channel = config.to_channel().map_err(|e| {
680            error!("error reading channel config: {}", e);
681            ControllerError::ConfigError(e.to_string())
682        })?;
683
684        let mut client = ControllerServiceClient::new(channel);
685        for i in 0..Self::MAX_RETRIES {
686            let (tx, rx) = mpsc::channel::<Result<ControlMessage, Status>>(128);
687            let out_stream = ReceiverStream::new(rx).map(|res| res.expect("mapping error"));
688            match client.open_control_channel(Request::new(out_stream)).await {
689                Ok(stream) => {
690                    // process the control message stream
691                    self.process_control_message_stream(
692                        Some(config),
693                        stream.into_inner(),
694                        tx.clone(),
695                        cancellation_token.clone(),
696                    );
697
698                    return Ok(tx);
699                }
700                Err(e) => {
701                    error!(%e, "connection error, retrying {}/{}", i + 1, Self::MAX_RETRIES);
702                }
703            };
704
705            // sleep 1 sec between each connection retry
706            tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
707        }
708
709        Err(ControllerError::ConfigError(format!(
710            "failed to connect to control plane after {} retries",
711            Self::MAX_RETRIES
712        )))
713    }
714
715    fn match_for_io_error(err_status: &Status) -> Option<&std::io::Error> {
716        let mut err: &(dyn std::error::Error + 'static) = err_status;
717
718        loop {
719            if let Some(io_err) = err.downcast_ref::<std::io::Error>() {
720                return Some(io_err);
721            }
722
723            // h2::Error do not expose std::io::Error with `source()`
724            // https://github.com/hyperium/h2/pull/462
725            if let Some(h2_err) = err.downcast_ref::<h2::Error>() {
726                if let Some(io_err) = h2_err.get_io() {
727                    return Some(io_err);
728                }
729            }
730
731            err = err.source()?;
732        }
733    }
734}
735
736#[tonic::async_trait]
737impl GrpcControllerService for ControllerService {
738    type OpenControlChannelStream =
739        Pin<Box<dyn Stream<Item = Result<ControlMessage, Status>> + Send + 'static>>;
740
741    async fn open_control_channel(
742        &self,
743        request: Request<tonic::Streaming<ControlMessage>>,
744    ) -> Result<Response<Self::OpenControlChannelStream>, Status> {
745        // Get the remote endpoint from the request metadata
746        let remote_endpoint = request
747            .remote_addr()
748            .map(|addr| addr.to_string())
749            .unwrap_or_else(|| "unknown".to_string());
750
751        let stream = request.into_inner();
752        let (tx, rx) = mpsc::channel::<Result<ControlMessage, Status>>(128);
753
754        let cancellation_token = CancellationToken::new();
755
756        self.process_control_message_stream(None, stream, tx.clone(), cancellation_token.clone());
757
758        // store the sender in the tx_channels map
759        self.inner
760            .tx_channels
761            .write()
762            .insert(remote_endpoint.clone(), tx);
763
764        // store the cancellation token in the controller service
765        self.inner
766            .cancellation_tokens
767            .write()
768            .insert(remote_endpoint.clone(), cancellation_token);
769
770        let out_stream = ReceiverStream::new(rx);
771        Ok(Response::new(
772            Box::pin(out_stream) as Self::OpenControlChannelStream
773        ))
774    }
775}
776
777#[cfg(test)]
778mod tests {
779    use super::*;
780    use slim_config::component::id::Kind;
781    use tracing_test::traced_test;
782
783    #[tokio::test]
784    #[traced_test]
785    async fn test_end_to_end() {
786        // Create an ID for slim instance
787        let id_server =
788            ID::new_with_name(Kind::new("slim").unwrap(), "test_server_instance").unwrap();
789        let id_client =
790            ID::new_with_name(Kind::new("slim").unwrap(), "test_client_instance").unwrap();
791
792        // Create a server configuration
793        let server_config = ServerConfig::with_endpoint("127.0.0.1:50051")
794            .with_tls_settings(slim_config::tls::server::TlsServerConfig::insecure());
795
796        // create a client configuration
797        let client_config = ClientConfig::with_endpoint("http://127.0.0.1:50051")
798            .with_tls_setting(slim_config::tls::client::TlsClientConfig::insecure());
799
800        // create drain channels
801        let (signal_server, watch_server) = drain::channel();
802        let (signal_client, watch_client) = drain::channel();
803
804        // Create a message processor
805        let message_processor_client = MessageProcessor::with_drain_channel(watch_client.clone());
806        let message_processor_server = MessageProcessor::with_drain_channel(watch_server.clone());
807
808        // Create a control plane instance for server
809        let mut control_plane_server = ControlPlane::new(
810            id_server,
811            vec![server_config],
812            vec![],
813            watch_server,
814            Arc::new(message_processor_server),
815        );
816
817        let mut control_plane_client = ControlPlane::new(
818            id_client,
819            vec![],
820            vec![client_config],
821            watch_client,
822            Arc::new(message_processor_client),
823        );
824
825        // Start the server
826        control_plane_server.run().await.unwrap();
827
828        // Sleep for a short duration to ensure the server is ready
829        tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
830
831        // Start the client
832        control_plane_client.run().await.unwrap();
833
834        // Sleep for a short duration to ensure the client is ready
835        tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
836
837        // Check if the server received the connection
838        assert!(logs_contain(
839            "received a register node request, this should not happen"
840        ));
841
842        // drop the server and the client. This should also cancel the running listeners
843        // and close the connections gracefully.
844        drop(control_plane_server);
845        drop(control_plane_client);
846
847        // Make sure there is nothing left to drain (this should not block)
848        signal_server.drain().await;
849        signal_client.drain().await;
850    }
851}