spring_batch_rs/item/mongodb/
mongodb_reader.rs

1use std::cell::{Cell, RefCell};
2
3use mongodb::{
4    bson::{doc, oid::ObjectId, Document},
5    options::FindOptions,
6    sync::Collection,
7};
8use serde::de::DeserializeOwned;
9
10use crate::core::item::{ItemReader, ItemReaderResult};
11
12pub trait WithObjectId {
13    fn get_id(&self) -> ObjectId;
14}
15
16/// A MongoDB item reader that reads items from a MongoDB collection.
17pub struct MongodbItemReader<'a, R> {
18    collection: &'a Collection<R>,
19    filter: Document,
20    options: FindOptions,
21    page_size: Option<i64>,
22    buffer: RefCell<Vec<R>>,
23    last_id: Cell<Option<ObjectId>>,
24    offset: Cell<usize>,
25}
26
27impl<'a, R: DeserializeOwned + WithObjectId> MongodbItemReader<'a, R> {
28    /// Reads a page of items from the MongoDB collection and stores them in the buffer.
29    fn read_page(&self) {
30        self.buffer.borrow_mut().clear();
31
32        let last_id = self.last_id.get();
33
34        let mut filter = self.filter.clone();
35
36        if last_id.is_some() {
37            filter.extend(doc! {"oid": { "$gt": last_id }});
38        };
39
40        let options = &self.options;
41
42        let mut cursor = self.collection.find(filter, options.clone()).unwrap();
43
44        while cursor.advance().unwrap() {
45            let result = cursor.deserialize_current();
46            if let Ok(item) = result {
47                self.last_id.set(Some(item.get_id()));
48                self.buffer.borrow_mut().push(item);
49            }
50        }
51    }
52}
53
54impl<'a, R: DeserializeOwned + Clone + WithObjectId> ItemReader<R> for MongodbItemReader<'a, R> {
55    /// Reads the next item from the MongoDB collection.
56    ///
57    /// Returns `Ok(Some(item))` if an item is read successfully,
58    /// `Ok(None)` if there are no more items to read,
59    /// or an error if reading the item fails.
60    fn read(&self) -> ItemReaderResult<R> {
61        let index = if let Some(page_size) = self.page_size {
62            self.offset.get() % (page_size as usize)
63        } else {
64            self.offset.get()
65        };
66
67        if index == 0 {
68            self.read_page();
69        }
70
71        let buffer = self.buffer.borrow();
72
73        let result = buffer.get(index);
74
75        match result {
76            Some(item) => {
77                self.offset.set(self.offset.get() + 1);
78                Ok(Some(item.clone()))
79            }
80            None => Ok(None),
81        }
82    }
83}
84
85#[derive(Default)]
86pub struct MongodbItemReaderBuilder<'a, R> {
87    collection: Option<&'a Collection<R>>,
88    filter: Option<Document>,
89    page_size: Option<i64>,
90}
91
92impl<'a, R> MongodbItemReaderBuilder<'a, R> {
93    /// Creates a new `MongodbItemReaderBuilder`.
94    pub fn new() -> Self {
95        Self {
96            collection: None,
97            filter: None,
98            page_size: None,
99        }
100    }
101
102    /// Sets the MongoDB collection to read from.
103    pub fn collection(mut self, collection: &'a Collection<R>) -> MongodbItemReaderBuilder<'a, R> {
104        self.collection = Some(collection);
105        self
106    }
107
108    /// Sets the filter to apply when reading items from the collection.
109    pub fn filter(mut self, filter: Document) -> MongodbItemReaderBuilder<'a, R> {
110        self.filter = Some(filter);
111        self
112    }
113
114    /// Sets the page size for reading items.
115    pub fn page_size(mut self, page_size: i64) -> MongodbItemReaderBuilder<'a, R> {
116        self.page_size = Some(page_size);
117        self
118    }
119
120    /// Builds the `MongodbItemReader` with the configured options.
121    pub fn build(&self) -> MongodbItemReader<'a, R> {
122        let buffer: Vec<R> = if let Some(page_size) = self.page_size {
123            let buffer_size = page_size.try_into().unwrap_or(1);
124            Vec::with_capacity(buffer_size)
125        } else {
126            Vec::new()
127        };
128
129        let filter = if let Some(filter) = self.filter.to_owned() {
130            filter
131        } else {
132            doc! {}
133        };
134
135        // We do not use skip because of performance issue for large dataset.
136        // It is better to sort and filter with an indexed field (_id)
137        let find_options = FindOptions::builder()
138            .sort(doc! { "oid": 1 })
139            .limit(Some(self.page_size.unwrap()))
140            .build();
141
142        MongodbItemReader {
143            collection: self.collection.unwrap(),
144            filter,
145            options: find_options,
146            page_size: self.page_size,
147            buffer: RefCell::new(buffer),
148            last_id: Cell::new(None),
149            offset: Cell::new(0),
150        }
151    }
152}