socketioxide_mongodb/drivers/
mongodb.rs

1use std::task::Poll;
2
3use crate::MessageExpirationStrategy;
4
5use super::{Driver, Item};
6use futures_core::Stream;
7use mongodb::{
8    IndexModel,
9    change_stream::{
10        ChangeStream,
11        event::{ChangeStreamEvent, OperationType},
12    },
13    options::IndexOptions,
14};
15use socketioxide_core::Uid;
16
17pub use mongodb as mongodb_client;
18
19/// A driver implementation for the [mongodb](docs.rs/mongodb) change stream backend.
20#[derive(Debug, Clone)]
21pub struct MongoDbDriver {
22    collec: mongodb::Collection<Item>,
23}
24
25impl MongoDbDriver {
26    /// Create a new [`MongoDbDriver`] with a connection to a [`mongodb`] database,
27    /// a collection and an eviction strategy.
28    pub async fn new(
29        db: mongodb::Database,
30        collection: &str,
31        eviction_strategy: &MessageExpirationStrategy,
32    ) -> Result<Self, mongodb::error::Error> {
33        let collec = match eviction_strategy {
34            MessageExpirationStrategy::CappedCollection(size) => {
35                tracing::debug!(
36                    ?size,
37                    "configuring capped collection as an expiration strategy"
38                );
39                db.create_collection(collection)
40                    .capped(true)
41                    .size(*size)
42                    .await?;
43                db.collection(collection)
44            }
45            MessageExpirationStrategy::TtlIndex(ttl) => {
46                tracing::debug!(?ttl, "configuring TTL index as an expiration strategy");
47                let options = IndexOptions::builder()
48                    .expire_after(*ttl)
49                    .background(true)
50                    .build();
51                let index = IndexModel::builder()
52                    .keys(mongodb::bson::doc! { "createdAt": 1 })
53                    .options(options)
54                    .build();
55
56                let collec = db.collection(collection);
57                collec.create_index(index).await?;
58                collec
59            }
60        };
61        Ok(Self { collec })
62    }
63}
64
65pin_project_lite::pin_project! {
66    /// The stream of document insertion returned by the mongodb change stream.
67    pub struct EvStream {
68        #[pin]
69        stream: ChangeStream<ChangeStreamEvent<Item>>,
70    }
71}
72impl Stream for EvStream {
73    type Item = Result<Item, mongodb::error::Error>;
74
75    fn poll_next(
76        self: std::pin::Pin<&mut Self>,
77        cx: &mut std::task::Context<'_>,
78    ) -> Poll<Option<Self::Item>> {
79        match self.project().stream.poll_next(cx) {
80            Poll::Ready(Some(Ok(ChangeStreamEvent {
81                full_document: Some(doc),
82                operation_type: OperationType::Insert,
83                ..
84            }))) => Poll::Ready(Some(Ok(doc))),
85            Poll::Ready(Some(Err(err))) => Poll::Ready(Some(Err(err))),
86            Poll::Ready(None) => Poll::Ready(None),
87            _ => Poll::Pending,
88        }
89    }
90}
91
92impl Driver for MongoDbDriver {
93    type Error = mongodb::error::Error;
94    type EvStream = EvStream;
95
96    async fn watch(&self, server_id: Uid, ns: &str) -> Result<EvStream, Self::Error> {
97        let stream = self
98            .collec
99            .watch()
100            .pipeline([mongodb::bson::doc! {
101              "$match": {
102                    "fullDocument.origin": { "$ne": server_id.as_str() },
103                    "fullDocument.ns": ns,
104                    "$or": [
105                        { "fullDocument.target": server_id.as_str() },
106                        { "fullDocument.target": { "$exists": false } }
107                    ],
108              },
109            }])
110            .await?;
111        Ok(EvStream { stream })
112    }
113
114    async fn emit(&self, data: &Item) -> Result<(), Self::Error> {
115        self.collec.insert_one(data).await?;
116        Ok(())
117    }
118}