mongo_tracing/
mongo_tracing.rs

1use std::borrow::Borrow;
2
3use mongodb::bson::{Bson, Document};
4use mongodb::change_stream::event::ChangeStreamEvent;
5use mongodb::change_stream::session::SessionChangeStream;
6use mongodb::change_stream::ChangeStream;
7use mongodb::error::Result;
8use mongodb::options::{
9    AggregateOptions, ChangeStreamOptions, CountOptions, CreateIndexOptions, DeleteOptions,
10    DistinctOptions, DropCollectionOptions, DropIndexOptions, EstimatedDocumentCountOptions,
11    FindOneOptions, FindOptions, InsertManyOptions, InsertOneOptions, ListIndexesOptions,
12    ReadConcern, ReplaceOptions, UpdateModifications, UpdateOptions, WriteConcern,
13};
14use mongodb::results::{
15    CreateIndexResult, CreateIndexesResult, DeleteResult, InsertManyResult, InsertOneResult,
16    UpdateResult,
17};
18use mongodb::{ClientSession, Collection, Cursor, Database, IndexModel, SessionCursor};
19use serde::de::DeserializeOwned;
20use serde::Serialize;
21use tracing::instrument;
22
23struct CollectionInfo {
24    database_name: String,
25}
26
27pub trait InstrumentedCollectionExt {
28    fn collection_instrumented<T>(&self, name: &str) -> InstrumentedCollection<T>;
29}
30
31impl InstrumentedCollectionExt for Database {
32    fn collection_instrumented<T>(&self, name: &str) -> InstrumentedCollection<T> {
33        InstrumentedCollection {
34            info: CollectionInfo {
35                database_name: self.name().parse().unwrap(),
36            },
37            inner: self.collection(name),
38        }
39    }
40}
41
42pub struct InstrumentedCollection<T> {
43    info: CollectionInfo,
44    inner: Collection<T>,
45}
46
47impl<T> InstrumentedCollection<T>
48where
49    T: DeserializeOwned + Unpin + Send + Sync,
50{
51    #[instrument(
52    fields(
53    db.name = % self.info.database_name ,
54    db.system = "mongodb",
55    db.collection = % self.inner.name(),
56    otel.kind = "client",
57    ),
58    skip(self, filter, options)
59    )]
60    pub async fn find_one(
61        &self,
62        filter: impl Into<Option<Document>>,
63        options: impl Into<Option<FindOneOptions>>,
64    ) -> Result<Option<T>> {
65        self.inner.find_one(filter, options).await
66    }
67    #[instrument(
68    fields(
69    db.name = % self.info.database_name ,
70    db.system = "mongodb",
71    db.collection = % self.inner.name(),
72    otel.kind = "client",
73    ),
74    skip(self, filter, options,session)
75    )]
76    pub async fn find_one_with_session(
77        &self,
78        filter: impl Into<Option<Document>>,
79        options: impl Into<Option<FindOneOptions>>,
80        session: &mut ClientSession,
81    ) -> Result<Option<T>> {
82        self.inner
83            .find_one_with_session(filter, options, session)
84            .await
85    }
86}
87
88impl<T> InstrumentedCollection<T>
89where
90    T: Serialize,
91{
92    #[instrument(
93    fields(
94    db.name = % self.info.database_name ,
95    db.system = "mongodb",
96    db.collection = % self.inner.name(),
97    otel.kind = "client",
98    ),
99    skip(self, docs, options)
100    )]
101    pub async fn insert_many(
102        &self,
103        docs: impl IntoIterator<Item = impl Borrow<T>>,
104        options: impl Into<Option<InsertManyOptions>>,
105    ) -> Result<InsertManyResult> {
106        self.inner.insert_many(docs, options).await
107    }
108
109    #[instrument(
110    fields(
111    db.name = % self.info.database_name ,
112    db.system = "mongodb",
113    db.collection = % self.inner.name(),
114    otel.kind = "client",
115    ),
116    skip(self, docs, options,session)
117    )]
118    pub async fn insert_many_with_session(
119        &self,
120        docs: impl IntoIterator<Item = impl Borrow<T>>,
121        options: impl Into<Option<InsertManyOptions>>,
122        session: &mut ClientSession,
123    ) -> Result<InsertManyResult> {
124        self.inner
125            .insert_many_with_session(docs, options, session)
126            .await
127    }
128    #[instrument(
129    fields(
130    db.name = % self.info.database_name ,
131    db.system = "mongodb",
132    db.collection = % self.inner.name(),
133    otel.kind = "client",
134    ),
135    skip(self, doc, options)
136    )]
137    pub async fn insert_one(
138        &self,
139        doc: impl Borrow<T>,
140        options: impl Into<Option<InsertOneOptions>>,
141    ) -> Result<InsertOneResult> {
142        self.inner.insert_one(doc, options).await
143    }
144    #[instrument(
145    fields(
146    db.name = % self.info.database_name ,
147    db.system = "mongodb",
148    db.collection = % self.inner.name(),
149    otel.kind = "client",
150    ),
151    skip(self, doc, options,session)
152    )]
153    pub async fn insert_one_with_session(
154        &self,
155        doc: impl Borrow<T>,
156        options: impl Into<Option<InsertOneOptions>>,
157        session: &mut ClientSession,
158    ) -> Result<InsertOneResult> {
159        self.inner
160            .insert_one_with_session(doc, options, session)
161            .await
162    }
163    #[instrument(
164    fields(
165    db.name = % self.info.database_name ,
166    db.system = "mongodb",
167    db.collection = % self.inner.name(),
168    otel.kind = "client",
169    ),
170    skip(self, query, replacement,options)
171    )]
172    pub async fn replace_one(
173        &self,
174        query: Document,
175        replacement: impl Borrow<T>,
176        options: impl Into<Option<ReplaceOptions>>,
177    ) -> Result<UpdateResult> {
178        self.inner.replace_one(query, replacement, options).await
179    }
180    #[instrument(
181    fields(
182    db.name = % self.info.database_name ,
183    db.system = "mongodb",
184    db.collection = % self.inner.name(),
185    otel.kind = "client",
186    ),
187    skip(self, query, replacement,options,session)
188    )]
189    pub async fn replace_one_with_session(
190        &self,
191        query: Document,
192        replacement: impl Borrow<T>,
193        options: impl Into<Option<ReplaceOptions>>,
194        session: &mut ClientSession,
195    ) -> Result<UpdateResult> {
196        self.inner
197            .replace_one_with_session(query, replacement, options, session)
198            .await
199    }
200}
201
202impl<T> InstrumentedCollection<T> {
203    #[instrument(
204    fields(
205    db.name = % self.info.database_name ,
206    db.system = "mongodb",
207    db.collection = % self.inner.name(),
208    otel.kind = "client",
209    ),
210    skip(self, query, update,options)
211    )]
212    pub async fn update_one(
213        &self,
214        query: Document,
215        update: impl Into<UpdateModifications>,
216        options: impl Into<Option<UpdateOptions>>,
217    ) -> Result<UpdateResult> {
218        self.inner.update_one(query, update, options).await
219    }
220    #[instrument(
221    fields(
222    db.name = % self.info.database_name ,
223    db.system = "mongodb",
224    db.collection = % self.inner.name(),
225    otel.kind = "client",
226    ),
227    skip(self)
228    )]
229    pub fn read_concern(&self) -> Option<&ReadConcern> {
230        self.inner.read_concern()
231    }
232    #[instrument(
233    fields(
234    db.name = % self.info.database_name ,
235    db.system = "mongodb",
236    db.collection = % self.inner.name(),
237    otel.kind = "client",
238    ),
239    skip(self)
240    )]
241    pub fn write_concern(&self) -> Option<&WriteConcern> {
242        self.inner.write_concern()
243    }
244    #[instrument(
245    fields(
246    db.name = % self.info.database_name ,
247    db.system = "mongodb",
248    db.collection = % self.inner.name(),
249    otel.kind = "client",
250    ),
251    skip(self,options)
252    )]
253    pub async fn drop(&self, options: impl Into<Option<DropCollectionOptions>>) -> Result<()> {
254        self.inner.drop(options).await
255    }
256    #[instrument(
257    fields(
258    db.name = % self.info.database_name ,
259    db.system = "mongodb",
260    db.collection = % self.inner.name(),
261    otel.kind = "client",
262    ),
263    skip(self,options,session)
264    )]
265    pub async fn drop_with_session(
266        &self,
267        options: impl Into<Option<DropCollectionOptions>>,
268        session: &mut ClientSession,
269    ) -> Result<()> {
270        self.inner.drop_with_session(options, session).await
271    }
272    #[instrument(
273    fields(
274    db.name = % self.info.database_name ,
275    db.system = "mongodb",
276    db.collection = % self.inner.name(),
277    otel.kind = "client",
278    ),
279    skip(self,pipeline,options)
280    )]
281    pub async fn aggregate(
282        &self,
283        pipeline: impl IntoIterator<Item = Document>,
284        options: impl Into<Option<AggregateOptions>>,
285    ) -> Result<Cursor<Document>> {
286        self.inner.aggregate(pipeline, options).await
287    }
288    #[instrument(
289    fields(
290    db.name = % self.info.database_name ,
291    db.system = "mongodb",
292    db.collection = % self.inner.name(),
293    otel.kind = "client",
294    ),
295    skip(self,pipeline,options,session)
296    )]
297    pub async fn aggregate_with_session(
298        &self,
299        pipeline: impl IntoIterator<Item = Document>,
300        options: impl Into<Option<AggregateOptions>>,
301        session: &mut ClientSession,
302    ) -> Result<SessionCursor<Document>> {
303        self.inner
304            .aggregate_with_session(pipeline, options, session)
305            .await
306    }
307    #[instrument(
308    fields(
309    db.name = % self.info.database_name ,
310    db.system = "mongodb",
311    db.collection = % self.inner.name(),
312    otel.kind = "client",
313    ),
314    skip(self,options)
315    )]
316    pub async fn estimated_document_count(
317        &self,
318        options: impl Into<Option<EstimatedDocumentCountOptions>>,
319    ) -> Result<u64> {
320        self.inner.estimated_document_count(options).await
321    }
322    #[instrument(
323    fields(
324    db.name = % self.info.database_name ,
325    db.system = "mongodb",
326    db.collection = % self.inner.name(),
327    otel.kind = "client",
328    ),
329    skip(self,filter,options)
330    )]
331    pub async fn count_documents(
332        &self,
333        filter: impl Into<Option<Document>>,
334        options: impl Into<Option<CountOptions>>,
335    ) -> Result<u64> {
336        self.inner.count_documents(filter, options).await
337    }
338    #[instrument(
339    fields(
340    db.name = % self.info.database_name ,
341    db.system = "mongodb",
342    db.collection = % self.inner.name(),
343    otel.kind = "client",
344    ),
345    skip(self,filter,options,session)
346    )]
347    pub async fn count_documents_with_session(
348        &self,
349        filter: impl Into<Option<Document>>,
350        options: impl Into<Option<CountOptions>>,
351        session: &mut ClientSession,
352    ) -> Result<u64> {
353        self.inner
354            .count_documents_with_session(filter, options, session)
355            .await
356    }
357    #[instrument(
358    fields(
359    db.name = % self.info.database_name ,
360    db.system = "mongodb",
361    db.collection = % self.inner.name(),
362    otel.kind = "client",
363    ),
364    skip(self,index,options)
365    )]
366    pub async fn create_index(
367        &self,
368        index: IndexModel,
369        options: impl Into<Option<CreateIndexOptions>>,
370    ) -> Result<CreateIndexResult> {
371        self.inner.create_index(index, options).await
372    }
373    #[instrument(
374    fields(
375    db.name = % self.info.database_name ,
376    db.system = "mongodb",
377    db.collection = % self.inner.name(),
378    otel.kind = "client",
379    ),
380    skip(self,index,options,session)
381    )]
382    pub async fn create_index_with_session(
383        &self,
384        index: IndexModel,
385        options: impl Into<Option<CreateIndexOptions>>,
386        session: &mut ClientSession,
387    ) -> Result<CreateIndexResult> {
388        self.inner
389            .create_index_with_session(index, options, session)
390            .await
391    }
392    #[instrument(
393    fields(
394    db.name = % self.info.database_name ,
395    db.system = "mongodb",
396    db.collection = % self.inner.name(),
397    otel.kind = "client",
398    ),
399    skip(self,indexes,options)
400    )]
401    pub async fn create_indexes(
402        &self,
403        indexes: impl IntoIterator<Item = IndexModel>,
404        options: impl Into<Option<CreateIndexOptions>>,
405    ) -> Result<CreateIndexesResult> {
406        self.inner.create_indexes(indexes, options).await
407    }
408    #[instrument(
409    fields(
410    db.name = % self.info.database_name ,
411    db.system = "mongodb",
412    db.collection = % self.inner.name(),
413    otel.kind = "client",
414    ),
415    skip(self,indexes,options,session)
416    )]
417    pub async fn create_indexes_with_session(
418        &self,
419        indexes: impl IntoIterator<Item = IndexModel>,
420        options: impl Into<Option<CreateIndexOptions>>,
421        session: &mut ClientSession,
422    ) -> Result<CreateIndexesResult> {
423        self.inner
424            .create_indexes_with_session(indexes, options, session)
425            .await
426    }
427    #[instrument(
428    fields(
429    db.name = % self.info.database_name ,
430    db.system = "mongodb",
431    db.collection = % self.inner.name(),
432    otel.kind = "client",
433    ),
434    skip(self,query,options)
435    )]
436    pub async fn delete_many(
437        &self,
438        query: Document,
439        options: impl Into<Option<DeleteOptions>>,
440    ) -> Result<DeleteResult> {
441        self.inner.delete_many(query, options).await
442    }
443    #[instrument(
444    fields(
445    db.name = % self.info.database_name ,
446    db.system = "mongodb",
447    db.collection = % self.inner.name(),
448    otel.kind = "client",
449    ),
450    skip(self,query,options,session)
451    )]
452    pub async fn delete_many_with_session(
453        &self,
454        query: Document,
455        options: impl Into<Option<DeleteOptions>>,
456        session: &mut ClientSession,
457    ) -> Result<DeleteResult> {
458        self.inner
459            .delete_many_with_session(query, options, session)
460            .await
461    }
462    #[instrument(
463    fields(
464    db.name = % self.info.database_name ,
465    db.system = "mongodb",
466    db.collection = % self.inner.name(),
467    otel.kind = "client",
468    ),
469    skip(self,query,options)
470    )]
471    pub async fn delete_one(
472        &self,
473        query: Document,
474        options: impl Into<Option<DeleteOptions>>,
475    ) -> Result<DeleteResult> {
476        self.inner.delete_one(query, options).await
477    }
478    #[instrument(
479    fields(
480    db.name = % self.info.database_name ,
481    db.system = "mongodb",
482    db.collection = % self.inner.name(),
483    otel.kind = "client",
484    ),
485    skip(self,query,options,session)
486    )]
487    pub async fn delete_one_with_session(
488        &self,
489        query: Document,
490        options: impl Into<Option<DeleteOptions>>,
491        session: &mut ClientSession,
492    ) -> Result<DeleteResult> {
493        self.inner
494            .delete_one_with_session(query, options, session)
495            .await
496    }
497    #[instrument(
498    fields(
499    db.name = % self.info.database_name ,
500    db.system = "mongodb",
501    db.collection = % self.inner.name(),
502    otel.kind = "client",
503    ),
504    skip(self,field_name,filter,options)
505    )]
506    pub async fn distinct(
507        &self,
508        field_name: impl AsRef<str>,
509        filter: impl Into<Option<Document>>,
510        options: impl Into<Option<DistinctOptions>>,
511    ) -> Result<Vec<Bson>> {
512        self.inner.distinct(field_name, filter, options).await
513    }
514    #[instrument(
515    fields(
516    db.name = % self.info.database_name ,
517    db.system = "mongodb",
518    db.collection = % self.inner.name(),
519    otel.kind = "client",
520    ),
521    skip(self,field_name,filter,options,session)
522    )]
523    pub async fn distinct_with_session(
524        &self,
525        field_name: impl AsRef<str>,
526        filter: impl Into<Option<Document>>,
527        options: impl Into<Option<DistinctOptions>>,
528        session: &mut ClientSession,
529    ) -> Result<Vec<Bson>> {
530        self.inner
531            .distinct_with_session(field_name, filter, options, session)
532            .await
533    }
534    #[instrument(
535    fields(
536    db.name = % self.info.database_name ,
537    db.system = "mongodb",
538    db.collection = % self.inner.name(),
539    otel.kind = "client",
540    ),
541    skip(self,name,options)
542    )]
543    pub async fn drop_index(
544        &self,
545        name: impl AsRef<str>,
546        options: impl Into<Option<DropIndexOptions>>,
547    ) -> Result<()> {
548        self.inner.drop_index(name, options).await
549    }
550    #[instrument(
551    fields(
552    db.name = % self.info.database_name ,
553    db.system = "mongodb",
554    db.collection = % self.inner.name(),
555    otel.kind = "client",
556    ),
557    skip(self,name,options,session)
558    )]
559    pub async fn drop_index_with_session(
560        &self,
561        name: impl AsRef<str>,
562        options: impl Into<Option<DropIndexOptions>>,
563        session: &mut ClientSession,
564    ) -> Result<()> {
565        self.inner
566            .drop_index_with_session(name, options, session)
567            .await
568    }
569    #[instrument(
570    fields(
571    db.name = % self.info.database_name ,
572    db.system = "mongodb",
573    db.collection = % self.inner.name(),
574    otel.kind = "client",
575    ),
576    skip(self,options)
577    )]
578    pub async fn drop_indexes(&self, options: impl Into<Option<DropIndexOptions>>) -> Result<()> {
579        self.inner.drop_indexes(options).await
580    }
581    #[instrument(
582    fields(
583    db.name = % self.info.database_name ,
584    db.system = "mongodb",
585    db.collection = % self.inner.name(),
586    otel.kind = "client",
587    ),
588    skip(self,options,session)
589    )]
590    pub async fn drop_indexes_with_session(
591        &self,
592        options: impl Into<Option<DropIndexOptions>>,
593        session: &mut ClientSession,
594    ) -> Result<()> {
595        self.inner.drop_indexes_with_session(options, session).await
596    }
597    #[instrument(
598    fields(
599    db.name = % self.info.database_name ,
600    db.system = "mongodb",
601    db.collection = % self.inner.name(),
602    otel.kind = "client",
603    ),
604    skip(self,options)
605    )]
606    pub async fn list_indexes(
607        &self,
608        options: impl Into<Option<ListIndexesOptions>>,
609    ) -> Result<Cursor<IndexModel>> {
610        self.inner.list_indexes(options).await
611    }
612    #[instrument(
613    fields(
614    db.name = % self.info.database_name ,
615    db.system = "mongodb",
616    db.collection = % self.inner.name(),
617    otel.kind = "client",
618    ),
619    skip(self,options,session)
620    )]
621    pub async fn list_indexes_with_session(
622        &self,
623        options: impl Into<Option<ListIndexesOptions>>,
624        session: &mut ClientSession,
625    ) -> Result<SessionCursor<IndexModel>> {
626        self.inner.list_indexes_with_session(options, session).await
627    }
628    #[instrument(
629    fields(
630    db.name = % self.info.database_name ,
631    db.system = "mongodb",
632    db.collection = % self.inner.name(),
633    otel.kind = "client",
634    ),
635    skip(self)
636    )]
637    pub async fn list_index_names(&self) -> Result<Vec<String>> {
638        self.inner.list_index_names().await
639    }
640    #[instrument(
641    fields(
642    db.name = % self.info.database_name ,
643    db.system = "mongodb",
644    db.collection = % self.inner.name(),
645    otel.kind = "client",
646    ),
647    skip(self,session)
648    )]
649    pub async fn list_index_names_with_session(
650        &self,
651        session: &mut ClientSession,
652    ) -> Result<Vec<String>> {
653        self.inner.list_index_names_with_session(session).await
654    }
655    #[instrument(
656    fields(
657    db.name = % self.info.database_name ,
658    db.system = "mongodb",
659    db.collection = % self.inner.name(),
660    otel.kind = "client",
661    ),
662    skip(self,query,update,options)
663    )]
664    pub async fn update_many(
665        &self,
666        query: Document,
667        update: impl Into<UpdateModifications>,
668        options: impl Into<Option<UpdateOptions>>,
669    ) -> Result<UpdateResult> {
670        self.inner.update_many(query, update, options).await
671    }
672    #[instrument(
673    fields(
674    db.name = % self.info.database_name ,
675    db.system = "mongodb",
676    db.collection = % self.inner.name(),
677    otel.kind = "client",
678    ),
679    skip(self,query,update,options,session)
680    )]
681    pub async fn update_many_with_session(
682        &self,
683        query: Document,
684        update: impl Into<UpdateModifications>,
685        options: impl Into<Option<UpdateOptions>>,
686        session: &mut ClientSession,
687    ) -> Result<UpdateResult> {
688        self.inner
689            .update_many_with_session(query, update, options, session)
690            .await
691    }
692    #[instrument(
693    fields(
694    db.name = % self.info.database_name ,
695    db.system = "mongodb",
696    db.collection = % self.inner.name(),
697    otel.kind = "client",
698    ),
699    skip(self,query,update,options,session)
700    )]
701    pub async fn update_one_with_session(
702        &self,
703        query: Document,
704        update: impl Into<UpdateModifications>,
705        options: impl Into<Option<UpdateOptions>>,
706        session: &mut ClientSession,
707    ) -> Result<UpdateResult> {
708        self.inner
709            .update_one_with_session(query, update, options, session)
710            .await
711    }
712    #[instrument(
713    fields(
714    db.name = % self.info.database_name ,
715    db.system = "mongodb",
716    db.collection = % self.inner.name(),
717    otel.kind = "client",
718    ),
719    skip(self,pipeline,options)
720    )]
721    pub async fn watch(
722        &self,
723        pipeline: impl IntoIterator<Item = Document>,
724        options: impl Into<Option<ChangeStreamOptions>>,
725    ) -> Result<ChangeStream<ChangeStreamEvent<T>>>
726    where
727        T: DeserializeOwned + Unpin + Send + Sync,
728    {
729        self.inner.watch(pipeline, options).await
730    }
731    #[instrument(
732    fields(
733    db.name = % self.info.database_name ,
734    db.system = "mongodb",
735    db.collection = % self.inner.name(),
736    otel.kind = "client",
737    ),
738    skip(self,pipeline,options,session)
739    )]
740    pub async fn watch_with_session(
741        &self,
742        pipeline: impl IntoIterator<Item = Document>,
743        options: impl Into<Option<ChangeStreamOptions>>,
744        session: &mut ClientSession,
745    ) -> Result<SessionChangeStream<ChangeStreamEvent<T>>>
746    where
747        T: DeserializeOwned + Unpin + Send + Sync,
748    {
749        self.inner
750            .watch_with_session(pipeline, options, session)
751            .await
752    }
753    #[instrument(
754    fields(
755    db.name = % self.info.database_name ,
756    db.system = "mongodb",
757    db.collection = % self.inner.name(),
758    otel.kind = "client",
759    ),
760    skip(self,filter,options)
761    )]
762    pub async fn find(
763        &self,
764        filter: impl Into<Option<Document>>,
765        options: impl Into<Option<FindOptions>>,
766    ) -> Result<Cursor<T>> {
767        self.inner.find(filter, options).await
768    }
769    #[instrument(
770    fields(
771    db.name = % self.info.database_name ,
772    db.system = "mongodb",
773    db.collection = % self.inner.name(),
774    otel.kind = "client",
775    ),
776    skip(self,filter,options,session)
777    )]
778    pub async fn find_with_session(
779        &self,
780        filter: impl Into<Option<Document>>,
781        options: impl Into<Option<FindOptions>>,
782        session: &mut ClientSession,
783    ) -> Result<SessionCursor<T>> {
784        self.inner.find_with_session(filter, options, session).await
785    }
786}