Skip to main content

spring_batch_rs/item/mongodb/
mongodb_reader.rs

1use std::cell::{Cell, RefCell};
2
3use mongodb::{
4    bson::{doc, oid::ObjectId, Document},
5    options::FindOptions,
6    sync::Collection,
7};
8use serde::de::DeserializeOwned;
9
10use crate::core::item::{ItemReader, ItemReaderResult};
11
12pub trait WithObjectId {
13    fn get_id(&self) -> ObjectId;
14}
15
16/// A MongoDB item reader that reads items from a MongoDB collection.
17pub struct MongodbItemReader<'a, R: Send + Sync> {
18    collection: &'a Collection<R>,
19    filter: Document,
20    options: Option<FindOptions>,
21    page_size: Option<i64>,
22    buffer: RefCell<Vec<R>>,
23    last_id: Cell<Option<ObjectId>>,
24    offset: Cell<usize>,
25}
26
27impl<I: DeserializeOwned + WithObjectId + Send + Sync> MongodbItemReader<'_, I> {
28    /// Reads a page of items from the MongoDB collection and stores them in the buffer.
29    fn read_page(&self) {
30        self.buffer.borrow_mut().clear();
31
32        let last_id = self.last_id.get();
33
34        let mut filter = self.filter.clone();
35
36        if last_id.is_some() {
37            filter.extend(doc! {"oid": { "$gt": last_id }});
38        };
39
40        let options = &self.options;
41
42        let mut cursor = self
43            .collection
44            .find(filter)
45            .with_options(options.clone())
46            .run()
47            .unwrap();
48
49        while cursor.advance().unwrap() {
50            let result = cursor.deserialize_current();
51            if let Ok(item) = result {
52                self.last_id.set(Some(item.get_id()));
53                self.buffer.borrow_mut().push(item);
54            }
55        }
56    }
57}
58
59impl<I: DeserializeOwned + Clone + WithObjectId + Send + Sync> ItemReader<I>
60    for MongodbItemReader<'_, I>
61{
62    /// Reads the next item from the MongoDB collection.
63    ///
64    /// Returns `Ok(Some(item))` if an item is read successfully,
65    /// `Ok(None)` if there are no more items to read,
66    /// or an error if reading the item fails.
67    fn read(&self) -> ItemReaderResult<I> {
68        let index = if let Some(page_size) = self.page_size {
69            self.offset.get() % (page_size as usize)
70        } else {
71            self.offset.get()
72        };
73
74        if index == 0 {
75            self.read_page();
76        }
77
78        let buffer = self.buffer.borrow();
79
80        let result = buffer.get(index);
81
82        match result {
83            Some(item) => {
84                self.offset.set(self.offset.get() + 1);
85                Ok(Some(item.clone()))
86            }
87            None => Ok(None),
88        }
89    }
90}
91
92#[derive(Default)]
93pub struct MongodbItemReaderBuilder<'a, I: Send + Sync> {
94    collection: Option<&'a Collection<I>>,
95    filter: Option<Document>,
96    page_size: Option<i64>,
97}
98
99impl<'a, I: Send + Sync> MongodbItemReaderBuilder<'a, I> {
100    /// Creates a new `MongodbItemReaderBuilder`.
101    pub fn new() -> Self {
102        Self {
103            collection: None,
104            filter: None,
105            page_size: None,
106        }
107    }
108
109    /// Sets the MongoDB collection to read from.
110    pub fn collection(mut self, collection: &'a Collection<I>) -> MongodbItemReaderBuilder<'a, I> {
111        self.collection = Some(collection);
112        self
113    }
114
115    /// Sets the filter to apply when reading items from the collection.
116    pub fn filter(mut self, filter: Document) -> MongodbItemReaderBuilder<'a, I> {
117        self.filter = Some(filter);
118        self
119    }
120
121    /// Sets the page size for reading items.
122    pub fn page_size(mut self, page_size: i64) -> MongodbItemReaderBuilder<'a, I> {
123        self.page_size = Some(page_size);
124        self
125    }
126
127    /// Builds the `MongodbItemReader` with the configured options.
128    pub fn build(&self) -> MongodbItemReader<'a, I> {
129        let buffer: Vec<I> = if let Some(page_size) = self.page_size {
130            let buffer_size = page_size.try_into().unwrap_or(1); // Or a more robust default/error handling for conversion
131            Vec::with_capacity(buffer_size)
132        } else {
133            Vec::new()
134        };
135
136        let filter = if let Some(filter) = self.filter.to_owned() {
137            filter
138        } else {
139            doc! {}
140        };
141
142        // We do not use skip because of performance issue for large dataset.
143        // It is better to sort and filter with an indexed field (_id)
144        let find_options = FindOptions::builder()
145            .sort(doc! { "oid": 1 })
146            .limit(Some(self.page_size.unwrap()))
147            .build();
148
149        MongodbItemReader {
150            collection: self.collection.unwrap(),
151            filter,
152            options: Some(find_options),
153            page_size: self.page_size,
154            buffer: RefCell::new(buffer),
155            last_id: Cell::new(None),
156            offset: Cell::new(0),
157        }
158    }
159}