socketioxide_mongodb/drivers/
mongodb.rs1use std::task::Poll;
2
3use crate::MessageExpirationStrategy;
4
5use super::{Driver, Item};
6use futures_core::Stream;
7use mongodb::{
8 IndexModel,
9 change_stream::{
10 ChangeStream,
11 event::{ChangeStreamEvent, OperationType},
12 },
13 options::IndexOptions,
14};
15use socketioxide_core::Uid;
16
17pub use mongodb as mongodb_client;
18
19#[derive(Debug, Clone)]
21pub struct MongoDbDriver {
22 collec: mongodb::Collection<Item>,
23}
24
25impl MongoDbDriver {
26 pub async fn new(
29 db: mongodb::Database,
30 collection: &str,
31 eviction_strategy: &MessageExpirationStrategy,
32 ) -> Result<Self, mongodb::error::Error> {
33 let collec = match eviction_strategy {
34 MessageExpirationStrategy::CappedCollection(size) => {
35 tracing::debug!(
36 ?size,
37 "configuring capped collection as an expiration strategy"
38 );
39 db.create_collection(collection)
40 .capped(true)
41 .size(*size)
42 .await?;
43 db.collection(collection)
44 }
45 MessageExpirationStrategy::TtlIndex(ttl) => {
46 tracing::debug!(?ttl, "configuring TTL index as an expiration strategy");
47 let options = IndexOptions::builder()
48 .expire_after(*ttl)
49 .background(true)
50 .build();
51 let index = IndexModel::builder()
52 .keys(mongodb::bson::doc! { "createdAt": 1 })
53 .options(options)
54 .build();
55
56 let collec = db.collection(collection);
57 collec.create_index(index).await?;
58 collec
59 }
60 };
61 Ok(Self { collec })
62 }
63}
64
65pin_project_lite::pin_project! {
66 pub struct EvStream {
68 #[pin]
69 stream: ChangeStream<ChangeStreamEvent<Item>>,
70 }
71}
72impl Stream for EvStream {
73 type Item = Result<Item, mongodb::error::Error>;
74
75 fn poll_next(
76 self: std::pin::Pin<&mut Self>,
77 cx: &mut std::task::Context<'_>,
78 ) -> Poll<Option<Self::Item>> {
79 match self.project().stream.poll_next(cx) {
80 Poll::Ready(Some(Ok(ChangeStreamEvent {
81 full_document: Some(doc),
82 operation_type: OperationType::Insert,
83 ..
84 }))) => Poll::Ready(Some(Ok(doc))),
85 Poll::Ready(Some(Err(err))) => Poll::Ready(Some(Err(err))),
86 Poll::Ready(None) => Poll::Ready(None),
87 _ => Poll::Pending,
88 }
89 }
90}
91
92impl Driver for MongoDbDriver {
93 type Error = mongodb::error::Error;
94 type EvStream = EvStream;
95
96 async fn watch(&self, server_id: Uid, ns: &str) -> Result<EvStream, Self::Error> {
97 let stream = self
98 .collec
99 .watch()
100 .pipeline([mongodb::bson::doc! {
101 "$match": {
102 "fullDocument.origin": { "$ne": server_id.as_str() },
103 "fullDocument.ns": ns,
104 "$or": [
105 { "fullDocument.target": server_id.as_str() },
106 { "fullDocument.target": { "$exists": false } }
107 ],
108 },
109 }])
110 .await?;
111 Ok(EvStream { stream })
112 }
113
114 async fn emit(&self, data: &Item) -> Result<(), Self::Error> {
115 self.collec.insert_one(data).await?;
116 Ok(())
117 }
118}