Skip to main content

slim_session/
common.rs

1// Copyright AGNTCY Contributors (https://github.com/agntcy)
2// SPDX-License-Identifier: Apache-2.0
3
4use std::time::Duration;
5
6// Third-party crates
7use smallvec::SmallVec;
8use tonic::Status;
9
10use slim_datapath::api::{
11    EncodedName, ProtoMessage as Message, ProtoName, ProtoSessionMessageType,
12};
13
14// Local crate
15use crate::SessionError;
16
17/// Reserved session id
18pub const SESSION_RANGE: std::ops::Range<u32> = 0..(u32::MAX - 1000);
19
20/// Unspecified session ID constant
21pub const SESSION_UNSPECIFIED: u32 = u32::MAX;
22
23/// Channel used in the path service -> app
24pub(crate) type AppChannelSender =
25    tokio::sync::mpsc::UnboundedSender<Result<Message, SessionError>>;
26/// Channel used in the path app -> service
27pub type AppChannelReceiver = tokio::sync::mpsc::UnboundedReceiver<Result<Message, SessionError>>;
28/// Channel used in the path service -> slim
29pub type SlimChannelSender = tokio::sync::mpsc::Sender<Result<Message, Status>>;
30
31/// The state of a session
32#[derive(Clone, PartialEq, Debug)]
33#[allow(dead_code)]
34pub enum State {
35    Active,
36    Inactive,
37}
38
39#[derive(Clone, Copy, PartialEq, Debug)]
40pub enum MessageDirection {
41    North,
42    South,
43}
44
45/// A message to be sent outbound from the session layers.
46#[derive(Debug)]
47pub enum OutboundMessage {
48    /// Send to SLIM (network-bound). Identity will be applied by the processing loop.
49    ToSlim(Message),
50    /// Send to the application.
51    ToApp(Result<Message, SessionError>),
52}
53
54/// The result of processing a session message.
55/// Layers return this instead of sending messages internally.
56/// Uses SmallVec since most operations produce 1-2 outbound messages.
57#[derive(Debug, Default)]
58pub struct SessionOutput {
59    pub messages: SmallVec<[OutboundMessage; 2]>,
60}
61
62impl SessionOutput {
63    pub fn new() -> Self {
64        Self {
65            messages: SmallVec::new(),
66        }
67    }
68
69    /// Create a new SessionOutput containing a single ToSlim message.
70    pub fn to_slim(message: Message) -> Self {
71        let mut s = Self::new();
72        s.messages.push(OutboundMessage::ToSlim(message));
73        s
74    }
75
76    /// Create a new SessionOutput containing a single ToApp message.
77    pub fn to_app(message: Result<Message, SessionError>) -> Self {
78        let mut s = Self::new();
79        s.messages.push(OutboundMessage::ToApp(message));
80        s
81    }
82
83    /// Push a ToSlim message onto an existing output.
84    pub fn push_slim(&mut self, message: Message) {
85        self.messages.push(OutboundMessage::ToSlim(message));
86    }
87
88    /// Push a ToApp message onto an existing output.
89    pub fn push_app(&mut self, message: Result<Message, SessionError>) {
90        self.messages.push(OutboundMessage::ToApp(message));
91    }
92
93    pub fn is_empty(&self) -> bool {
94        self.messages.is_empty()
95    }
96
97    /// Merge another output into this one.
98    pub fn extend(&mut self, other: SessionOutput) {
99        self.messages.extend(other.messages);
100    }
101}
102
103/// Message types for communication between session components
104#[allow(clippy::large_enum_variant)]
105#[derive(Debug)]
106pub enum SessionMessage {
107    /// application message coming from the app or from slim
108    OnMessage {
109        message: Message,
110        direction: MessageDirection,
111        /// Optional channel to signal when message processing is complete
112        ack_tx: Option<tokio::sync::oneshot::Sender<Result<(), SessionError>>>,
113    },
114    /// Error occurred during message processing
115    MessageError { error: SessionError },
116    /// timeout signal for a message (ack,rtx or control messages)
117    /// that needs to be send again
118    TimerTimeout {
119        message_id: u32,
120        message_type: ProtoSessionMessageType,
121        name: Option<EncodedName>,
122        timeouts: u32,
123    },
124    /// timer failure, signal to the owner of the packet that
125    /// the message will not be delivered
126    TimerFailure {
127        message_id: u32,
128        message_type: ProtoSessionMessageType,
129        name: Option<EncodedName>,
130        timeouts: u32,
131    },
132    /// sent by the controller sender when a disconnection is detected
133    ParticipantDisconnected { name: Option<ProtoName> },
134    /// message from session layer to the session controller
135    /// to start to the close procedures of the session
136    StartDrain { grace_period: Duration },
137    /// message from session controller to session layer
138    /// to notify that the session can be removed safely
139    DeleteSession { session_id: u32 },
140    /// Query the participants list from the handler
141    GetParticipantsList {
142        tx: tokio::sync::oneshot::Sender<Vec<ProtoName>>,
143    },
144    /// Deferred cleanup after leave reply has been dispatched.
145    /// Performs route/subscription cleanup that must happen after the LeaveReply
146    /// is sent (in the return-based output model, dispatch happens after on_message returns).
147    LeaveCleanup,
148}