Skip to main content

slim_session/
common.rs

1// Copyright AGNTCY Contributors (https://github.com/agntcy)
2// SPDX-License-Identifier: Apache-2.0
3
4use std::time::Duration;
5
6// Third-party crates
7use tonic::Status;
8
9use slim_datapath::{
10    api::{ProtoMessage as Message, ProtoSessionMessageType, ProtoSessionType},
11    messages::{Name, utils::MessageError},
12};
13
14// Local crate
15use crate::SessionError;
16
17/// Reserved session id
18pub const SESSION_RANGE: std::ops::Range<u32> = 0..(u32::MAX - 1000);
19
20/// Unspecified session ID constant
21pub const SESSION_UNSPECIFIED: u32 = u32::MAX;
22
23/// Channel used in the path service -> app
24pub(crate) type AppChannelSender =
25    tokio::sync::mpsc::UnboundedSender<Result<Message, SessionError>>;
26/// Channel used in the path app -> service
27pub type AppChannelReceiver = tokio::sync::mpsc::UnboundedReceiver<Result<Message, SessionError>>;
28/// Channel used in the path service -> slim
29pub type SlimChannelSender = tokio::sync::mpsc::Sender<Result<Message, Status>>;
30
31/// The state of a session
32#[derive(Clone, PartialEq, Debug)]
33#[allow(dead_code)]
34pub enum State {
35    Active,
36    Inactive,
37}
38
39#[derive(Clone, Copy, PartialEq, Debug)]
40pub enum MessageDirection {
41    North,
42    South,
43}
44
45#[allow(clippy::too_many_arguments)]
46pub fn new_message_from_session_fields(
47    local_name: &Name,
48    target_name: &Name,
49    target_conn: u64,
50    is_error: bool,
51    session_type: ProtoSessionType,
52    message_type: ProtoSessionMessageType,
53    session_id: u32,
54    message_id: u32,
55    metadata: Option<std::collections::HashMap<String, String>>,
56) -> Result<Message, MessageError> {
57    let mut builder = Message::builder()
58        .source(local_name.clone())
59        .destination(target_name.clone())
60        .identity("")
61        .forward_to(target_conn)
62        .session_type(session_type)
63        .session_message_type(message_type)
64        .session_id(session_id)
65        .message_id(message_id)
66        .application_payload("", vec![]);
67
68    if is_error {
69        builder = builder.error(true);
70    }
71
72    if let Some(meta) = metadata {
73        builder = builder.metadata_map(meta);
74    }
75
76    builder.build_publish()
77}
78
79/// Message types for communication between session components
80#[allow(clippy::large_enum_variant)]
81#[derive(Debug)]
82pub enum SessionMessage {
83    /// application message coming from the app or from slim
84    OnMessage {
85        message: Message,
86        direction: MessageDirection,
87        /// Optional channel to signal when message processing is complete
88        ack_tx: Option<tokio::sync::oneshot::Sender<Result<(), SessionError>>>,
89    },
90    /// Error occurred during message processing
91    MessageError { error: SessionError },
92    /// timeout signal for a message (ack,rtx or control messages)
93    /// that needs to be send again
94    TimerTimeout {
95        message_id: u32,
96        message_type: ProtoSessionMessageType,
97        name: Option<Name>,
98        timeouts: u32,
99    },
100    /// timer failure, signal to the owner of the packet that
101    /// the message will not be delivered
102    TimerFailure {
103        message_id: u32,
104        message_type: ProtoSessionMessageType,
105        name: Option<Name>,
106        timeouts: u32,
107    },
108    /// sent by the controller sender when a disconnection is detected
109    ParticipantDisconnected { name: Option<Name> },
110    /// message from session layer to the session controller
111    /// to start to the close procedures of the session
112    StartDrain { grace_period: Duration },
113    /// message from session controller to session layer
114    /// to notify that the session can be removed safely
115    DeleteSession { session_id: u32 },
116    /// Query the participants list from the handler
117    GetParticipantsList {
118        tx: tokio::sync::oneshot::Sender<Vec<Name>>,
119    },
120}