socketioxide_mongodb/drivers/mod.rs
1use futures_core::{Future, Stream};
2use serde::{Deserialize, Serialize};
3use socketioxide_core::{Sid, Uid};
4
5/// A driver implementation for the [mongodb](docs.rs/mongodb) change stream backend.
6#[cfg(feature = "mongodb")]
7pub mod mongodb;
8
9/// The mongodb document that will be inserted in the collection to share requests/responses.
10#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq)]
11#[serde(rename_all = "camelCase")]
12pub struct Item {
13 /// Some header infos to filter, dispatch and decode correctly requests/responses.
14 #[serde(flatten)]
15 pub header: ItemHeader,
16 /// The targeted socket.io namespace
17 pub ns: String,
18 /// The origin server of the request.
19 pub origin: Uid,
20 /// The msgpack-encoded payload of our request/response.
21 pub data: Vec<u8>,
22 /// A created at flag inserted only for ttl collection and used for the ttl index.
23 #[cfg(feature = "ttl-index")]
24 #[serde(skip_deserializing, skip_serializing_if = "Option::is_none")]
25 pub created_at: Option<bson::DateTime>,
26}
27impl Item {
28 pub(crate) fn new(
29 header: ItemHeader,
30 data: &impl Serialize,
31 origin: Uid,
32 ns: &str,
33 ) -> Result<Self, rmp_serde::encode::Error> {
34 let data = rmp_serde::to_vec(data)?;
35 Ok(Self {
36 header,
37 data,
38 origin,
39 ns: ns.to_string(),
40 created_at: None,
41 })
42 }
43 #[cfg(feature = "ttl-index")]
44 pub(crate) fn new_ttl(
45 header: ItemHeader,
46 data: &impl Serialize,
47 origin: Uid,
48 ns: &str,
49 ) -> Result<Self, rmp_serde::encode::Error> {
50 let data = rmp_serde::to_vec(data)?;
51 Ok(Self {
52 header,
53 data,
54 origin,
55 ns: ns.to_string(),
56 created_at: Some(bson::DateTime::now()),
57 })
58 }
59}
60
61/// A header to identify the type of message being sent, its origin, and its target.
62#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq)]
63#[serde(tag = "t")]
64pub enum ItemHeader {
65 /// A request.
66 Req {
67 /// If it is set, the request is sent to one server only.
68 /// Otherwise it is considered to be a broadcast request.
69 #[serde(skip_serializing_if = "Option::is_none")]
70 target: Option<Uid>,
71 },
72 /// A response.
73 Res {
74 /// The request ID we are answering to.
75 request: Sid,
76 /// The target server to send the response to. This usually corresponds to
77 /// the server that sent the corresponding request.
78 target: Uid,
79 },
80}
81impl ItemHeader {
82 /// Get the target of the req/res
83 pub fn get_target(&self) -> Option<Uid> {
84 match self {
85 ItemHeader::Req { target, .. } => *target,
86 ItemHeader::Res { target, .. } => Some(*target),
87 }
88 }
89}
90
91/// The driver trait can be used to support different MongoDB backends.
92/// It must share handlers/connection between its clones.
93pub trait Driver: Clone + Send + Sync + 'static {
94 /// The error type for the driver.
95 type Error: std::error::Error + Send + 'static;
96 /// The event stream returned by the [`Driver::watch`] method.
97 type EvStream: Stream<Item = Result<Item, Self::Error>> + Unpin + Send + 'static;
98
99 /// Watch for document insertions on the collection. This should be implemented by change streams
100 /// but could be implemented by anything else.
101 ///
102 /// The implementation should take care of filtering the events.
103 /// It must pass events that originate from our server and if the target is set it should pass events
104 /// sent for us. Here is the corresponding filter:
105 /// `origin != self.uid && (target == null || target == self.uid)`.
106 fn watch(
107 &self,
108 server_id: Uid,
109 ns: &str,
110 ) -> impl Future<Output = Result<Self::EvStream, Self::Error>> + Send;
111
112 /// Emit an document to the collection.
113 fn emit(&self, data: &Item) -> impl Future<Output = Result<(), Self::Error>> + Send;
114}