1use std::borrow::Borrow;
2
3use mongodb::bson::{Bson, Document};
4use mongodb::change_stream::event::ChangeStreamEvent;
5use mongodb::change_stream::session::SessionChangeStream;
6use mongodb::change_stream::ChangeStream;
7use mongodb::error::Result;
8use mongodb::options::{
9 AggregateOptions, ChangeStreamOptions, CountOptions, CreateIndexOptions, DeleteOptions,
10 DistinctOptions, DropCollectionOptions, DropIndexOptions, EstimatedDocumentCountOptions,
11 FindOneOptions, FindOptions, InsertManyOptions, InsertOneOptions, ListIndexesOptions,
12 ReadConcern, ReplaceOptions, UpdateModifications, UpdateOptions, WriteConcern,
13};
14use mongodb::results::{
15 CreateIndexResult, CreateIndexesResult, DeleteResult, InsertManyResult, InsertOneResult,
16 UpdateResult,
17};
18use mongodb::{ClientSession, Collection, Cursor, Database, IndexModel, SessionCursor};
19use serde::de::DeserializeOwned;
20use serde::Serialize;
21use tracing::instrument;
22
23struct CollectionInfo {
24 database_name: String,
25}
26
27pub trait InstrumentedCollectionExt {
28 fn collection_instrumented<T>(&self, name: &str) -> InstrumentedCollection<T>;
29}
30
31impl InstrumentedCollectionExt for Database {
32 fn collection_instrumented<T>(&self, name: &str) -> InstrumentedCollection<T> {
33 InstrumentedCollection {
34 info: CollectionInfo {
35 database_name: self.name().parse().unwrap(),
36 },
37 inner: self.collection(name),
38 }
39 }
40}
41
42pub struct InstrumentedCollection<T> {
43 info: CollectionInfo,
44 inner: Collection<T>,
45}
46
47impl<T> InstrumentedCollection<T>
48where
49 T: DeserializeOwned + Unpin + Send + Sync,
50{
51 #[instrument(
52 fields(
53 db.name = % self.info.database_name ,
54 db.system = "mongodb",
55 db.collection = % self.inner.name(),
56 otel.kind = "client",
57 ),
58 skip(self, filter, options)
59 )]
60 pub async fn find_one(
61 &self,
62 filter: impl Into<Option<Document>>,
63 options: impl Into<Option<FindOneOptions>>,
64 ) -> Result<Option<T>> {
65 self.inner.find_one(filter, options).await
66 }
67 #[instrument(
68 fields(
69 db.name = % self.info.database_name ,
70 db.system = "mongodb",
71 db.collection = % self.inner.name(),
72 otel.kind = "client",
73 ),
74 skip(self, filter, options,session)
75 )]
76 pub async fn find_one_with_session(
77 &self,
78 filter: impl Into<Option<Document>>,
79 options: impl Into<Option<FindOneOptions>>,
80 session: &mut ClientSession,
81 ) -> Result<Option<T>> {
82 self.inner
83 .find_one_with_session(filter, options, session)
84 .await
85 }
86}
87
88impl<T> InstrumentedCollection<T>
89where
90 T: Serialize,
91{
92 #[instrument(
93 fields(
94 db.name = % self.info.database_name ,
95 db.system = "mongodb",
96 db.collection = % self.inner.name(),
97 otel.kind = "client",
98 ),
99 skip(self, docs, options)
100 )]
101 pub async fn insert_many(
102 &self,
103 docs: impl IntoIterator<Item = impl Borrow<T>>,
104 options: impl Into<Option<InsertManyOptions>>,
105 ) -> Result<InsertManyResult> {
106 self.inner.insert_many(docs, options).await
107 }
108
109 #[instrument(
110 fields(
111 db.name = % self.info.database_name ,
112 db.system = "mongodb",
113 db.collection = % self.inner.name(),
114 otel.kind = "client",
115 ),
116 skip(self, docs, options,session)
117 )]
118 pub async fn insert_many_with_session(
119 &self,
120 docs: impl IntoIterator<Item = impl Borrow<T>>,
121 options: impl Into<Option<InsertManyOptions>>,
122 session: &mut ClientSession,
123 ) -> Result<InsertManyResult> {
124 self.inner
125 .insert_many_with_session(docs, options, session)
126 .await
127 }
128 #[instrument(
129 fields(
130 db.name = % self.info.database_name ,
131 db.system = "mongodb",
132 db.collection = % self.inner.name(),
133 otel.kind = "client",
134 ),
135 skip(self, doc, options)
136 )]
137 pub async fn insert_one(
138 &self,
139 doc: impl Borrow<T>,
140 options: impl Into<Option<InsertOneOptions>>,
141 ) -> Result<InsertOneResult> {
142 self.inner.insert_one(doc, options).await
143 }
144 #[instrument(
145 fields(
146 db.name = % self.info.database_name ,
147 db.system = "mongodb",
148 db.collection = % self.inner.name(),
149 otel.kind = "client",
150 ),
151 skip(self, doc, options,session)
152 )]
153 pub async fn insert_one_with_session(
154 &self,
155 doc: impl Borrow<T>,
156 options: impl Into<Option<InsertOneOptions>>,
157 session: &mut ClientSession,
158 ) -> Result<InsertOneResult> {
159 self.inner
160 .insert_one_with_session(doc, options, session)
161 .await
162 }
163 #[instrument(
164 fields(
165 db.name = % self.info.database_name ,
166 db.system = "mongodb",
167 db.collection = % self.inner.name(),
168 otel.kind = "client",
169 ),
170 skip(self, query, replacement,options)
171 )]
172 pub async fn replace_one(
173 &self,
174 query: Document,
175 replacement: impl Borrow<T>,
176 options: impl Into<Option<ReplaceOptions>>,
177 ) -> Result<UpdateResult> {
178 self.inner.replace_one(query, replacement, options).await
179 }
180 #[instrument(
181 fields(
182 db.name = % self.info.database_name ,
183 db.system = "mongodb",
184 db.collection = % self.inner.name(),
185 otel.kind = "client",
186 ),
187 skip(self, query, replacement,options,session)
188 )]
189 pub async fn replace_one_with_session(
190 &self,
191 query: Document,
192 replacement: impl Borrow<T>,
193 options: impl Into<Option<ReplaceOptions>>,
194 session: &mut ClientSession,
195 ) -> Result<UpdateResult> {
196 self.inner
197 .replace_one_with_session(query, replacement, options, session)
198 .await
199 }
200}
201
202impl<T> InstrumentedCollection<T> {
203 #[instrument(
204 fields(
205 db.name = % self.info.database_name ,
206 db.system = "mongodb",
207 db.collection = % self.inner.name(),
208 otel.kind = "client",
209 ),
210 skip(self, query, update,options)
211 )]
212 pub async fn update_one(
213 &self,
214 query: Document,
215 update: impl Into<UpdateModifications>,
216 options: impl Into<Option<UpdateOptions>>,
217 ) -> Result<UpdateResult> {
218 self.inner.update_one(query, update, options).await
219 }
220 #[instrument(
221 fields(
222 db.name = % self.info.database_name ,
223 db.system = "mongodb",
224 db.collection = % self.inner.name(),
225 otel.kind = "client",
226 ),
227 skip(self)
228 )]
229 pub fn read_concern(&self) -> Option<&ReadConcern> {
230 self.inner.read_concern()
231 }
232 #[instrument(
233 fields(
234 db.name = % self.info.database_name ,
235 db.system = "mongodb",
236 db.collection = % self.inner.name(),
237 otel.kind = "client",
238 ),
239 skip(self)
240 )]
241 pub fn write_concern(&self) -> Option<&WriteConcern> {
242 self.inner.write_concern()
243 }
244 #[instrument(
245 fields(
246 db.name = % self.info.database_name ,
247 db.system = "mongodb",
248 db.collection = % self.inner.name(),
249 otel.kind = "client",
250 ),
251 skip(self,options)
252 )]
253 pub async fn drop(&self, options: impl Into<Option<DropCollectionOptions>>) -> Result<()> {
254 self.inner.drop(options).await
255 }
256 #[instrument(
257 fields(
258 db.name = % self.info.database_name ,
259 db.system = "mongodb",
260 db.collection = % self.inner.name(),
261 otel.kind = "client",
262 ),
263 skip(self,options,session)
264 )]
265 pub async fn drop_with_session(
266 &self,
267 options: impl Into<Option<DropCollectionOptions>>,
268 session: &mut ClientSession,
269 ) -> Result<()> {
270 self.inner.drop_with_session(options, session).await
271 }
272 #[instrument(
273 fields(
274 db.name = % self.info.database_name ,
275 db.system = "mongodb",
276 db.collection = % self.inner.name(),
277 otel.kind = "client",
278 ),
279 skip(self,pipeline,options)
280 )]
281 pub async fn aggregate(
282 &self,
283 pipeline: impl IntoIterator<Item = Document>,
284 options: impl Into<Option<AggregateOptions>>,
285 ) -> Result<Cursor<Document>> {
286 self.inner.aggregate(pipeline, options).await
287 }
288 #[instrument(
289 fields(
290 db.name = % self.info.database_name ,
291 db.system = "mongodb",
292 db.collection = % self.inner.name(),
293 otel.kind = "client",
294 ),
295 skip(self,pipeline,options,session)
296 )]
297 pub async fn aggregate_with_session(
298 &self,
299 pipeline: impl IntoIterator<Item = Document>,
300 options: impl Into<Option<AggregateOptions>>,
301 session: &mut ClientSession,
302 ) -> Result<SessionCursor<Document>> {
303 self.inner
304 .aggregate_with_session(pipeline, options, session)
305 .await
306 }
307 #[instrument(
308 fields(
309 db.name = % self.info.database_name ,
310 db.system = "mongodb",
311 db.collection = % self.inner.name(),
312 otel.kind = "client",
313 ),
314 skip(self,options)
315 )]
316 pub async fn estimated_document_count(
317 &self,
318 options: impl Into<Option<EstimatedDocumentCountOptions>>,
319 ) -> Result<u64> {
320 self.inner.estimated_document_count(options).await
321 }
322 #[instrument(
323 fields(
324 db.name = % self.info.database_name ,
325 db.system = "mongodb",
326 db.collection = % self.inner.name(),
327 otel.kind = "client",
328 ),
329 skip(self,filter,options)
330 )]
331 pub async fn count_documents(
332 &self,
333 filter: impl Into<Option<Document>>,
334 options: impl Into<Option<CountOptions>>,
335 ) -> Result<u64> {
336 self.inner.count_documents(filter, options).await
337 }
338 #[instrument(
339 fields(
340 db.name = % self.info.database_name ,
341 db.system = "mongodb",
342 db.collection = % self.inner.name(),
343 otel.kind = "client",
344 ),
345 skip(self,filter,options,session)
346 )]
347 pub async fn count_documents_with_session(
348 &self,
349 filter: impl Into<Option<Document>>,
350 options: impl Into<Option<CountOptions>>,
351 session: &mut ClientSession,
352 ) -> Result<u64> {
353 self.inner
354 .count_documents_with_session(filter, options, session)
355 .await
356 }
357 #[instrument(
358 fields(
359 db.name = % self.info.database_name ,
360 db.system = "mongodb",
361 db.collection = % self.inner.name(),
362 otel.kind = "client",
363 ),
364 skip(self,index,options)
365 )]
366 pub async fn create_index(
367 &self,
368 index: IndexModel,
369 options: impl Into<Option<CreateIndexOptions>>,
370 ) -> Result<CreateIndexResult> {
371 self.inner.create_index(index, options).await
372 }
373 #[instrument(
374 fields(
375 db.name = % self.info.database_name ,
376 db.system = "mongodb",
377 db.collection = % self.inner.name(),
378 otel.kind = "client",
379 ),
380 skip(self,index,options,session)
381 )]
382 pub async fn create_index_with_session(
383 &self,
384 index: IndexModel,
385 options: impl Into<Option<CreateIndexOptions>>,
386 session: &mut ClientSession,
387 ) -> Result<CreateIndexResult> {
388 self.inner
389 .create_index_with_session(index, options, session)
390 .await
391 }
392 #[instrument(
393 fields(
394 db.name = % self.info.database_name ,
395 db.system = "mongodb",
396 db.collection = % self.inner.name(),
397 otel.kind = "client",
398 ),
399 skip(self,indexes,options)
400 )]
401 pub async fn create_indexes(
402 &self,
403 indexes: impl IntoIterator<Item = IndexModel>,
404 options: impl Into<Option<CreateIndexOptions>>,
405 ) -> Result<CreateIndexesResult> {
406 self.inner.create_indexes(indexes, options).await
407 }
408 #[instrument(
409 fields(
410 db.name = % self.info.database_name ,
411 db.system = "mongodb",
412 db.collection = % self.inner.name(),
413 otel.kind = "client",
414 ),
415 skip(self,indexes,options,session)
416 )]
417 pub async fn create_indexes_with_session(
418 &self,
419 indexes: impl IntoIterator<Item = IndexModel>,
420 options: impl Into<Option<CreateIndexOptions>>,
421 session: &mut ClientSession,
422 ) -> Result<CreateIndexesResult> {
423 self.inner
424 .create_indexes_with_session(indexes, options, session)
425 .await
426 }
427 #[instrument(
428 fields(
429 db.name = % self.info.database_name ,
430 db.system = "mongodb",
431 db.collection = % self.inner.name(),
432 otel.kind = "client",
433 ),
434 skip(self,query,options)
435 )]
436 pub async fn delete_many(
437 &self,
438 query: Document,
439 options: impl Into<Option<DeleteOptions>>,
440 ) -> Result<DeleteResult> {
441 self.inner.delete_many(query, options).await
442 }
443 #[instrument(
444 fields(
445 db.name = % self.info.database_name ,
446 db.system = "mongodb",
447 db.collection = % self.inner.name(),
448 otel.kind = "client",
449 ),
450 skip(self,query,options,session)
451 )]
452 pub async fn delete_many_with_session(
453 &self,
454 query: Document,
455 options: impl Into<Option<DeleteOptions>>,
456 session: &mut ClientSession,
457 ) -> Result<DeleteResult> {
458 self.inner
459 .delete_many_with_session(query, options, session)
460 .await
461 }
462 #[instrument(
463 fields(
464 db.name = % self.info.database_name ,
465 db.system = "mongodb",
466 db.collection = % self.inner.name(),
467 otel.kind = "client",
468 ),
469 skip(self,query,options)
470 )]
471 pub async fn delete_one(
472 &self,
473 query: Document,
474 options: impl Into<Option<DeleteOptions>>,
475 ) -> Result<DeleteResult> {
476 self.inner.delete_one(query, options).await
477 }
478 #[instrument(
479 fields(
480 db.name = % self.info.database_name ,
481 db.system = "mongodb",
482 db.collection = % self.inner.name(),
483 otel.kind = "client",
484 ),
485 skip(self,query,options,session)
486 )]
487 pub async fn delete_one_with_session(
488 &self,
489 query: Document,
490 options: impl Into<Option<DeleteOptions>>,
491 session: &mut ClientSession,
492 ) -> Result<DeleteResult> {
493 self.inner
494 .delete_one_with_session(query, options, session)
495 .await
496 }
497 #[instrument(
498 fields(
499 db.name = % self.info.database_name ,
500 db.system = "mongodb",
501 db.collection = % self.inner.name(),
502 otel.kind = "client",
503 ),
504 skip(self,field_name,filter,options)
505 )]
506 pub async fn distinct(
507 &self,
508 field_name: impl AsRef<str>,
509 filter: impl Into<Option<Document>>,
510 options: impl Into<Option<DistinctOptions>>,
511 ) -> Result<Vec<Bson>> {
512 self.inner.distinct(field_name, filter, options).await
513 }
514 #[instrument(
515 fields(
516 db.name = % self.info.database_name ,
517 db.system = "mongodb",
518 db.collection = % self.inner.name(),
519 otel.kind = "client",
520 ),
521 skip(self,field_name,filter,options,session)
522 )]
523 pub async fn distinct_with_session(
524 &self,
525 field_name: impl AsRef<str>,
526 filter: impl Into<Option<Document>>,
527 options: impl Into<Option<DistinctOptions>>,
528 session: &mut ClientSession,
529 ) -> Result<Vec<Bson>> {
530 self.inner
531 .distinct_with_session(field_name, filter, options, session)
532 .await
533 }
534 #[instrument(
535 fields(
536 db.name = % self.info.database_name ,
537 db.system = "mongodb",
538 db.collection = % self.inner.name(),
539 otel.kind = "client",
540 ),
541 skip(self,name,options)
542 )]
543 pub async fn drop_index(
544 &self,
545 name: impl AsRef<str>,
546 options: impl Into<Option<DropIndexOptions>>,
547 ) -> Result<()> {
548 self.inner.drop_index(name, options).await
549 }
550 #[instrument(
551 fields(
552 db.name = % self.info.database_name ,
553 db.system = "mongodb",
554 db.collection = % self.inner.name(),
555 otel.kind = "client",
556 ),
557 skip(self,name,options,session)
558 )]
559 pub async fn drop_index_with_session(
560 &self,
561 name: impl AsRef<str>,
562 options: impl Into<Option<DropIndexOptions>>,
563 session: &mut ClientSession,
564 ) -> Result<()> {
565 self.inner
566 .drop_index_with_session(name, options, session)
567 .await
568 }
569 #[instrument(
570 fields(
571 db.name = % self.info.database_name ,
572 db.system = "mongodb",
573 db.collection = % self.inner.name(),
574 otel.kind = "client",
575 ),
576 skip(self,options)
577 )]
578 pub async fn drop_indexes(&self, options: impl Into<Option<DropIndexOptions>>) -> Result<()> {
579 self.inner.drop_indexes(options).await
580 }
581 #[instrument(
582 fields(
583 db.name = % self.info.database_name ,
584 db.system = "mongodb",
585 db.collection = % self.inner.name(),
586 otel.kind = "client",
587 ),
588 skip(self,options,session)
589 )]
590 pub async fn drop_indexes_with_session(
591 &self,
592 options: impl Into<Option<DropIndexOptions>>,
593 session: &mut ClientSession,
594 ) -> Result<()> {
595 self.inner.drop_indexes_with_session(options, session).await
596 }
597 #[instrument(
598 fields(
599 db.name = % self.info.database_name ,
600 db.system = "mongodb",
601 db.collection = % self.inner.name(),
602 otel.kind = "client",
603 ),
604 skip(self,options)
605 )]
606 pub async fn list_indexes(
607 &self,
608 options: impl Into<Option<ListIndexesOptions>>,
609 ) -> Result<Cursor<IndexModel>> {
610 self.inner.list_indexes(options).await
611 }
612 #[instrument(
613 fields(
614 db.name = % self.info.database_name ,
615 db.system = "mongodb",
616 db.collection = % self.inner.name(),
617 otel.kind = "client",
618 ),
619 skip(self,options,session)
620 )]
621 pub async fn list_indexes_with_session(
622 &self,
623 options: impl Into<Option<ListIndexesOptions>>,
624 session: &mut ClientSession,
625 ) -> Result<SessionCursor<IndexModel>> {
626 self.inner.list_indexes_with_session(options, session).await
627 }
628 #[instrument(
629 fields(
630 db.name = % self.info.database_name ,
631 db.system = "mongodb",
632 db.collection = % self.inner.name(),
633 otel.kind = "client",
634 ),
635 skip(self)
636 )]
637 pub async fn list_index_names(&self) -> Result<Vec<String>> {
638 self.inner.list_index_names().await
639 }
640 #[instrument(
641 fields(
642 db.name = % self.info.database_name ,
643 db.system = "mongodb",
644 db.collection = % self.inner.name(),
645 otel.kind = "client",
646 ),
647 skip(self,session)
648 )]
649 pub async fn list_index_names_with_session(
650 &self,
651 session: &mut ClientSession,
652 ) -> Result<Vec<String>> {
653 self.inner.list_index_names_with_session(session).await
654 }
655 #[instrument(
656 fields(
657 db.name = % self.info.database_name ,
658 db.system = "mongodb",
659 db.collection = % self.inner.name(),
660 otel.kind = "client",
661 ),
662 skip(self,query,update,options)
663 )]
664 pub async fn update_many(
665 &self,
666 query: Document,
667 update: impl Into<UpdateModifications>,
668 options: impl Into<Option<UpdateOptions>>,
669 ) -> Result<UpdateResult> {
670 self.inner.update_many(query, update, options).await
671 }
672 #[instrument(
673 fields(
674 db.name = % self.info.database_name ,
675 db.system = "mongodb",
676 db.collection = % self.inner.name(),
677 otel.kind = "client",
678 ),
679 skip(self,query,update,options,session)
680 )]
681 pub async fn update_many_with_session(
682 &self,
683 query: Document,
684 update: impl Into<UpdateModifications>,
685 options: impl Into<Option<UpdateOptions>>,
686 session: &mut ClientSession,
687 ) -> Result<UpdateResult> {
688 self.inner
689 .update_many_with_session(query, update, options, session)
690 .await
691 }
692 #[instrument(
693 fields(
694 db.name = % self.info.database_name ,
695 db.system = "mongodb",
696 db.collection = % self.inner.name(),
697 otel.kind = "client",
698 ),
699 skip(self,query,update,options,session)
700 )]
701 pub async fn update_one_with_session(
702 &self,
703 query: Document,
704 update: impl Into<UpdateModifications>,
705 options: impl Into<Option<UpdateOptions>>,
706 session: &mut ClientSession,
707 ) -> Result<UpdateResult> {
708 self.inner
709 .update_one_with_session(query, update, options, session)
710 .await
711 }
712 #[instrument(
713 fields(
714 db.name = % self.info.database_name ,
715 db.system = "mongodb",
716 db.collection = % self.inner.name(),
717 otel.kind = "client",
718 ),
719 skip(self,pipeline,options)
720 )]
721 pub async fn watch(
722 &self,
723 pipeline: impl IntoIterator<Item = Document>,
724 options: impl Into<Option<ChangeStreamOptions>>,
725 ) -> Result<ChangeStream<ChangeStreamEvent<T>>>
726 where
727 T: DeserializeOwned + Unpin + Send + Sync,
728 {
729 self.inner.watch(pipeline, options).await
730 }
731 #[instrument(
732 fields(
733 db.name = % self.info.database_name ,
734 db.system = "mongodb",
735 db.collection = % self.inner.name(),
736 otel.kind = "client",
737 ),
738 skip(self,pipeline,options,session)
739 )]
740 pub async fn watch_with_session(
741 &self,
742 pipeline: impl IntoIterator<Item = Document>,
743 options: impl Into<Option<ChangeStreamOptions>>,
744 session: &mut ClientSession,
745 ) -> Result<SessionChangeStream<ChangeStreamEvent<T>>>
746 where
747 T: DeserializeOwned + Unpin + Send + Sync,
748 {
749 self.inner
750 .watch_with_session(pipeline, options, session)
751 .await
752 }
753 #[instrument(
754 fields(
755 db.name = % self.info.database_name ,
756 db.system = "mongodb",
757 db.collection = % self.inner.name(),
758 otel.kind = "client",
759 ),
760 skip(self,filter,options)
761 )]
762 pub async fn find(
763 &self,
764 filter: impl Into<Option<Document>>,
765 options: impl Into<Option<FindOptions>>,
766 ) -> Result<Cursor<T>> {
767 self.inner.find(filter, options).await
768 }
769 #[instrument(
770 fields(
771 db.name = % self.info.database_name ,
772 db.system = "mongodb",
773 db.collection = % self.inner.name(),
774 otel.kind = "client",
775 ),
776 skip(self,filter,options,session)
777 )]
778 pub async fn find_with_session(
779 &self,
780 filter: impl Into<Option<Document>>,
781 options: impl Into<Option<FindOptions>>,
782 session: &mut ClientSession,
783 ) -> Result<SessionCursor<T>> {
784 self.inner.find_with_session(filter, options, session).await
785 }
786}