socketioxide_mongodb/drivers/
mod.rs

1use futures_core::{Future, Stream};
2use serde::{Deserialize, Serialize};
3use socketioxide_core::{Sid, Uid};
4
5/// A driver implementation for the [mongodb](docs.rs/mongodb) change stream backend.
6#[cfg(feature = "mongodb")]
7pub mod mongodb;
8
9/// The mongodb document that will be inserted in the collection to share requests/responses.
10#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq)]
11#[serde(rename_all = "camelCase")]
12pub struct Item {
13    /// Some header infos to filter, dispatch and decode correctly requests/responses.
14    #[serde(flatten)]
15    pub header: ItemHeader,
16    /// The targeted socket.io namespace
17    pub ns: String,
18    /// The origin server of the request.
19    pub origin: Uid,
20    /// The msgpack-encoded payload of our request/response.
21    pub data: Vec<u8>,
22    /// A created at flag inserted only for ttl collection and used for the ttl index.
23    #[cfg(feature = "ttl-index")]
24    #[serde(skip_deserializing, skip_serializing_if = "Option::is_none")]
25    pub created_at: Option<bson::DateTime>,
26}
27impl Item {
28    pub(crate) fn new(
29        header: ItemHeader,
30        data: &impl Serialize,
31        origin: Uid,
32        ns: &str,
33    ) -> Result<Self, rmp_serde::encode::Error> {
34        let data = rmp_serde::to_vec(data)?;
35        Ok(Self {
36            header,
37            data,
38            origin,
39            ns: ns.to_string(),
40            created_at: None,
41        })
42    }
43    #[cfg(feature = "ttl-index")]
44    pub(crate) fn new_ttl(
45        header: ItemHeader,
46        data: &impl Serialize,
47        origin: Uid,
48        ns: &str,
49    ) -> Result<Self, rmp_serde::encode::Error> {
50        let data = rmp_serde::to_vec(data)?;
51        Ok(Self {
52            header,
53            data,
54            origin,
55            ns: ns.to_string(),
56            created_at: Some(bson::DateTime::now()),
57        })
58    }
59}
60
61/// A header to identify the type of message being sent, its origin, and its target.
62#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq)]
63#[serde(tag = "t")]
64pub enum ItemHeader {
65    /// A request.
66    Req {
67        /// If it is set, the request is sent to one server only.
68        /// Otherwise it is considered to be a broadcast request.
69        #[serde(skip_serializing_if = "Option::is_none")]
70        target: Option<Uid>,
71    },
72    /// A response.
73    Res {
74        /// The request ID we are answering to.
75        request: Sid,
76        /// The target server to send the response to. This usually corresponds to
77        /// the server that sent the corresponding request.
78        target: Uid,
79    },
80}
81impl ItemHeader {
82    /// Get the target of the req/res
83    pub fn get_target(&self) -> Option<Uid> {
84        match self {
85            ItemHeader::Req { target, .. } => *target,
86            ItemHeader::Res { target, .. } => Some(*target),
87        }
88    }
89}
90
91/// The driver trait can be used to support different MongoDB backends.
92/// It must share handlers/connection between its clones.
93pub trait Driver: Clone + Send + Sync + 'static {
94    /// The error type for the driver.
95    type Error: std::error::Error + Send + 'static;
96    /// The event stream returned by the [`Driver::watch`] method.
97    type EvStream: Stream<Item = Result<Item, Self::Error>> + Unpin + Send + 'static;
98
99    /// Watch for document insertions on the collection. This should be implemented by change streams
100    /// but could be implemented by anything else.
101    ///
102    /// The implementation should take care of filtering the events.
103    /// It must pass events that originate from our server and if the target is set it should pass events
104    /// sent for us. Here is the corresponding filter:
105    /// `origin != self.uid && (target == null || target == self.uid)`.
106    fn watch(
107        &self,
108        server_id: Uid,
109        ns: &str,
110    ) -> impl Future<Output = Result<Self::EvStream, Self::Error>> + Send;
111
112    /// Emit an document to the collection.
113    fn emit(&self, data: &Item) -> impl Future<Output = Result<(), Self::Error>> + Send;
114}