spring_batch_rs/item/mongodb/
mongodb_reader.rs1use std::cell::{Cell, RefCell};
2
3use mongodb::{
4 bson::{doc, oid::ObjectId, Document},
5 options::FindOptions,
6 sync::Collection,
7};
8use serde::de::DeserializeOwned;
9
10use crate::core::item::{ItemReader, ItemReaderResult};
11
12pub trait WithObjectId {
13 fn get_id(&self) -> ObjectId;
14}
15
16pub struct MongodbItemReader<'a, R: Send + Sync> {
18 collection: &'a Collection<R>,
19 filter: Document,
20 options: Option<FindOptions>,
21 page_size: Option<i64>,
22 buffer: RefCell<Vec<R>>,
23 last_id: Cell<Option<ObjectId>>,
24 offset: Cell<usize>,
25}
26
27impl<I: DeserializeOwned + WithObjectId + Send + Sync> MongodbItemReader<'_, I> {
28 fn read_page(&self) {
30 self.buffer.borrow_mut().clear();
31
32 let last_id = self.last_id.get();
33
34 let mut filter = self.filter.clone();
35
36 if last_id.is_some() {
37 filter.extend(doc! {"oid": { "$gt": last_id }});
38 };
39
40 let options = &self.options;
41
42 let mut cursor = self
43 .collection
44 .find(filter)
45 .with_options(options.clone())
46 .run()
47 .unwrap();
48
49 while cursor.advance().unwrap() {
50 let result = cursor.deserialize_current();
51 if let Ok(item) = result {
52 self.last_id.set(Some(item.get_id()));
53 self.buffer.borrow_mut().push(item);
54 }
55 }
56 }
57}
58
59impl<I: DeserializeOwned + Clone + WithObjectId + Send + Sync> ItemReader<I>
60 for MongodbItemReader<'_, I>
61{
62 fn read(&self) -> ItemReaderResult<I> {
68 let index = if let Some(page_size) = self.page_size {
69 self.offset.get() % (page_size as usize)
70 } else {
71 self.offset.get()
72 };
73
74 if index == 0 {
75 self.read_page();
76 }
77
78 let buffer = self.buffer.borrow();
79
80 let result = buffer.get(index);
81
82 match result {
83 Some(item) => {
84 self.offset.set(self.offset.get() + 1);
85 Ok(Some(item.clone()))
86 }
87 None => Ok(None),
88 }
89 }
90}
91
92#[derive(Default)]
93pub struct MongodbItemReaderBuilder<'a, I: Send + Sync> {
94 collection: Option<&'a Collection<I>>,
95 filter: Option<Document>,
96 page_size: Option<i64>,
97}
98
99impl<'a, I: Send + Sync> MongodbItemReaderBuilder<'a, I> {
100 pub fn new() -> Self {
102 Self {
103 collection: None,
104 filter: None,
105 page_size: None,
106 }
107 }
108
109 pub fn collection(mut self, collection: &'a Collection<I>) -> MongodbItemReaderBuilder<'a, I> {
111 self.collection = Some(collection);
112 self
113 }
114
115 pub fn filter(mut self, filter: Document) -> MongodbItemReaderBuilder<'a, I> {
117 self.filter = Some(filter);
118 self
119 }
120
121 pub fn page_size(mut self, page_size: i64) -> MongodbItemReaderBuilder<'a, I> {
123 self.page_size = Some(page_size);
124 self
125 }
126
127 pub fn build(&self) -> MongodbItemReader<'a, I> {
129 let buffer: Vec<I> = if let Some(page_size) = self.page_size {
130 let buffer_size = page_size.try_into().unwrap_or(1); Vec::with_capacity(buffer_size)
132 } else {
133 Vec::new()
134 };
135
136 let filter = if let Some(filter) = self.filter.to_owned() {
137 filter
138 } else {
139 doc! {}
140 };
141
142 let find_options = FindOptions::builder()
145 .sort(doc! { "oid": 1 })
146 .limit(Some(self.page_size.unwrap()))
147 .build();
148
149 MongodbItemReader {
150 collection: self.collection.unwrap(),
151 filter,
152 options: Some(find_options),
153 page_size: self.page_size,
154 buffer: RefCell::new(buffer),
155 last_id: Cell::new(None),
156 offset: Cell::new(0),
157 }
158 }
159}