spring_batch_rs/item/mongodb/
mongodb_reader.rs
1use std::cell::{Cell, RefCell};
2
3use mongodb::{
4 bson::{doc, oid::ObjectId, Document},
5 options::FindOptions,
6 sync::Collection,
7};
8use serde::de::DeserializeOwned;
9
10use crate::core::item::{ItemReader, ItemReaderResult};
11
12pub trait WithObjectId {
13 fn get_id(&self) -> ObjectId;
14}
15
16pub struct MongodbItemReader<'a, R> {
18 collection: &'a Collection<R>,
19 filter: Document,
20 options: FindOptions,
21 page_size: Option<i64>,
22 buffer: RefCell<Vec<R>>,
23 last_id: Cell<Option<ObjectId>>,
24 offset: Cell<usize>,
25}
26
27impl<'a, R: DeserializeOwned + WithObjectId> MongodbItemReader<'a, R> {
28 fn read_page(&self) {
30 self.buffer.borrow_mut().clear();
31
32 let last_id = self.last_id.get();
33
34 let mut filter = self.filter.clone();
35
36 if last_id.is_some() {
37 filter.extend(doc! {"oid": { "$gt": last_id }});
38 };
39
40 let options = &self.options;
41
42 let mut cursor = self.collection.find(filter, options.clone()).unwrap();
43
44 while cursor.advance().unwrap() {
45 let result = cursor.deserialize_current();
46 if let Ok(item) = result {
47 self.last_id.set(Some(item.get_id()));
48 self.buffer.borrow_mut().push(item);
49 }
50 }
51 }
52}
53
54impl<'a, R: DeserializeOwned + Clone + WithObjectId> ItemReader<R> for MongodbItemReader<'a, R> {
55 fn read(&self) -> ItemReaderResult<R> {
61 let index = if let Some(page_size) = self.page_size {
62 self.offset.get() % (page_size as usize)
63 } else {
64 self.offset.get()
65 };
66
67 if index == 0 {
68 self.read_page();
69 }
70
71 let buffer = self.buffer.borrow();
72
73 let result = buffer.get(index);
74
75 match result {
76 Some(item) => {
77 self.offset.set(self.offset.get() + 1);
78 Ok(Some(item.clone()))
79 }
80 None => Ok(None),
81 }
82 }
83}
84
85#[derive(Default)]
86pub struct MongodbItemReaderBuilder<'a, R> {
87 collection: Option<&'a Collection<R>>,
88 filter: Option<Document>,
89 page_size: Option<i64>,
90}
91
92impl<'a, R> MongodbItemReaderBuilder<'a, R> {
93 pub fn new() -> Self {
95 Self {
96 collection: None,
97 filter: None,
98 page_size: None,
99 }
100 }
101
102 pub fn collection(mut self, collection: &'a Collection<R>) -> MongodbItemReaderBuilder<'a, R> {
104 self.collection = Some(collection);
105 self
106 }
107
108 pub fn filter(mut self, filter: Document) -> MongodbItemReaderBuilder<'a, R> {
110 self.filter = Some(filter);
111 self
112 }
113
114 pub fn page_size(mut self, page_size: i64) -> MongodbItemReaderBuilder<'a, R> {
116 self.page_size = Some(page_size);
117 self
118 }
119
120 pub fn build(&self) -> MongodbItemReader<'a, R> {
122 let buffer: Vec<R> = if let Some(page_size) = self.page_size {
123 let buffer_size = page_size.try_into().unwrap_or(1);
124 Vec::with_capacity(buffer_size)
125 } else {
126 Vec::new()
127 };
128
129 let filter = if let Some(filter) = self.filter.to_owned() {
130 filter
131 } else {
132 doc! {}
133 };
134
135 let find_options = FindOptions::builder()
138 .sort(doc! { "oid": 1 })
139 .limit(Some(self.page_size.unwrap()))
140 .build();
141
142 MongodbItemReader {
143 collection: self.collection.unwrap(),
144 filter,
145 options: find_options,
146 page_size: self.page_size,
147 buffer: RefCell::new(buffer),
148 last_id: Cell::new(None),
149 offset: Cell::new(0),
150 }
151 }
152}