1pub mod remote;
2
3use std::future::Future;
4use std::sync::Arc;
5
6use crate::{
7 common::types::{APIError, RequestCallbacks},
8 types::{
9 bulk_operation::{
10 CancelResp, CollectionExportLine, CustomerExportLine, DraftOrderExportLine,
11 GetBulkOperationResp, InventoryItemExportLine, ListBulkOperationsParams,
12 ListBulkOperationsResp, OrderExportLine, ProductExportLine, RunMutationResp,
13 RunQueryResp, StagedUploadInput, StagedUploadsCreateResp,
14 },
15 collection::CollectionQueryParams,
16 customer::CustomerQueryParams,
17 draft_order::DraftOrderQueryParams,
18 inventory::InventoryItemQueryParams,
19 order::OrderQueryParams,
20 product::ProductQueryParams,
21 },
22};
23
24pub struct BulkOperation {
25 pub shop_url: Arc<String>,
26 pub version: Arc<String>,
27 pub access_token: Arc<String>,
28 pub callbacks: Arc<RequestCallbacks>,
29}
30
31impl BulkOperation {
32 pub fn new(
33 shop_url: Arc<String>,
34 version: Arc<String>,
35 access_token: Arc<String>,
36 callbacks: Arc<RequestCallbacks>,
37 ) -> Self {
38 BulkOperation {
39 shop_url,
40 version,
41 access_token,
42 callbacks,
43 }
44 }
45
46 pub async fn run_query(
49 &self,
50 query: &str,
51 group_objects: Option<bool>,
52 ) -> Result<RunQueryResp, APIError> {
53 remote::run_query(
54 &self.shop_url,
55 &self.version,
56 &self.access_token,
57 &self.callbacks,
58 query,
59 group_objects,
60 )
61 .await
62 }
63
64 pub async fn run_mutation(
65 &self,
66 mutation: &str,
67 staged_upload_path: &str,
68 client_identifier: Option<&str>,
69 ) -> Result<RunMutationResp, APIError> {
70 remote::run_mutation(
71 &self.shop_url,
72 &self.version,
73 &self.access_token,
74 &self.callbacks,
75 mutation,
76 staged_upload_path,
77 client_identifier,
78 )
79 .await
80 }
81
82 pub async fn cancel(&self, id: &str) -> Result<CancelResp, APIError> {
83 remote::cancel(
84 &self.shop_url,
85 &self.version,
86 &self.access_token,
87 &self.callbacks,
88 id,
89 )
90 .await
91 }
92
93 pub async fn get(&self, id: &str) -> Result<GetBulkOperationResp, APIError> {
94 remote::get(
95 &self.shop_url,
96 &self.version,
97 &self.access_token,
98 &self.callbacks,
99 id,
100 )
101 .await
102 }
103
104 pub async fn list(
105 &self,
106 params: &ListBulkOperationsParams,
107 ) -> Result<ListBulkOperationsResp, APIError> {
108 remote::list(
109 &self.shop_url,
110 &self.version,
111 &self.access_token,
112 &self.callbacks,
113 params,
114 )
115 .await
116 }
117
118 pub async fn create_staged_upload(
119 &self,
120 input: &[StagedUploadInput],
121 ) -> Result<StagedUploadsCreateResp, APIError> {
122 remote::create_staged_upload(
123 &self.shop_url,
124 &self.version,
125 &self.access_token,
126 &self.callbacks,
127 input,
128 )
129 .await
130 }
131
132 pub async fn export_products(
137 &self,
138 params: Option<&ProductQueryParams>,
139 ) -> Result<RunQueryResp, APIError> {
140 let filter = params.and_then(|p| p.to_query_string());
141 let query = remote::products_query(filter.as_deref());
142 self.run_query(&query, None).await
143 }
144
145 pub async fn export_orders(
146 &self,
147 params: Option<&OrderQueryParams>,
148 ) -> Result<RunQueryResp, APIError> {
149 let filter = params.and_then(|p| p.to_query_string());
150 let query = remote::orders_query(filter.as_deref());
151 self.run_query(&query, None).await
152 }
153
154 pub async fn export_collections(
155 &self,
156 params: Option<&CollectionQueryParams>,
157 ) -> Result<RunQueryResp, APIError> {
158 let filter = params.and_then(|p| p.to_query_string());
159 let query = remote::collections_query(filter.as_deref());
160 self.run_query(&query, None).await
161 }
162
163 pub async fn export_customers(
164 &self,
165 params: Option<&CustomerQueryParams>,
166 ) -> Result<RunQueryResp, APIError> {
167 let filter = params.and_then(|p| p.to_query_string());
168 let query = remote::customers_query(filter.as_deref());
169 self.run_query(&query, None).await
170 }
171
172 pub async fn export_inventory_items(
173 &self,
174 params: Option<&InventoryItemQueryParams>,
175 ) -> Result<RunQueryResp, APIError> {
176 let filter = params.and_then(|p| p.to_query_string());
177 let query = remote::inventory_items_query(filter.as_deref());
178 self.run_query(&query, None).await
179 }
180
181 pub async fn export_draft_orders(
182 &self,
183 params: Option<&DraftOrderQueryParams>,
184 ) -> Result<RunQueryResp, APIError> {
185 let filter = params.and_then(|p| p.to_query_string());
186 let query = remote::draft_orders_query(filter.as_deref());
187 self.run_query(&query, None).await
188 }
189
190 pub async fn stream_jsonl<F, Fut>(
195 &self,
196 url: &str,
197 batch_size: usize,
198 on_batch: F,
199 ) -> Result<(), APIError>
200 where
201 F: FnMut(Vec<String>) -> Fut,
202 Fut: Future<Output = Result<(), APIError>>,
203 {
204 download_and_batch(url, batch_size, |line| Some(line.to_string()), on_batch).await
205 }
206
207 pub async fn stream_products<F, Fut>(
208 &self,
209 url: &str,
210 batch_size: usize,
211 on_batch: F,
212 ) -> Result<(), APIError>
213 where
214 F: FnMut(Vec<ProductExportLine>) -> Fut,
215 Fut: Future<Output = Result<(), APIError>>,
216 {
217 download_and_batch(
218 url,
219 batch_size,
220 |line| ProductExportLine::parse_line(line).ok(),
221 on_batch,
222 )
223 .await
224 }
225
226 pub async fn stream_orders<F, Fut>(
227 &self,
228 url: &str,
229 batch_size: usize,
230 on_batch: F,
231 ) -> Result<(), APIError>
232 where
233 F: FnMut(Vec<OrderExportLine>) -> Fut,
234 Fut: Future<Output = Result<(), APIError>>,
235 {
236 download_and_batch(
237 url,
238 batch_size,
239 |line| OrderExportLine::parse_line(line).ok(),
240 on_batch,
241 )
242 .await
243 }
244
245 pub async fn stream_collections<F, Fut>(
246 &self,
247 url: &str,
248 batch_size: usize,
249 on_batch: F,
250 ) -> Result<(), APIError>
251 where
252 F: FnMut(Vec<CollectionExportLine>) -> Fut,
253 Fut: Future<Output = Result<(), APIError>>,
254 {
255 download_and_batch(
256 url,
257 batch_size,
258 |line| CollectionExportLine::parse_line(line).ok(),
259 on_batch,
260 )
261 .await
262 }
263
264 pub async fn stream_customers<F, Fut>(
265 &self,
266 url: &str,
267 batch_size: usize,
268 on_batch: F,
269 ) -> Result<(), APIError>
270 where
271 F: FnMut(Vec<CustomerExportLine>) -> Fut,
272 Fut: Future<Output = Result<(), APIError>>,
273 {
274 download_and_batch(
275 url,
276 batch_size,
277 |line| CustomerExportLine::parse_line(line).ok(),
278 on_batch,
279 )
280 .await
281 }
282
283 pub async fn stream_inventory_items<F, Fut>(
284 &self,
285 url: &str,
286 batch_size: usize,
287 on_batch: F,
288 ) -> Result<(), APIError>
289 where
290 F: FnMut(Vec<InventoryItemExportLine>) -> Fut,
291 Fut: Future<Output = Result<(), APIError>>,
292 {
293 download_and_batch(
294 url,
295 batch_size,
296 |line| InventoryItemExportLine::parse_line(line).ok(),
297 on_batch,
298 )
299 .await
300 }
301
302 pub async fn stream_draft_orders<F, Fut>(
303 &self,
304 url: &str,
305 batch_size: usize,
306 on_batch: F,
307 ) -> Result<(), APIError>
308 where
309 F: FnMut(Vec<DraftOrderExportLine>) -> Fut,
310 Fut: Future<Output = Result<(), APIError>>,
311 {
312 download_and_batch(
313 url,
314 batch_size,
315 |line| DraftOrderExportLine::parse_line(line).ok(),
316 on_batch,
317 )
318 .await
319 }
320
321 }
323
324async fn download_and_batch<T, P, F, Fut>(
325 url: &str,
326 batch_size: usize,
327 mut parse: P,
328 mut on_batch: F,
329) -> Result<(), APIError>
330where
331 P: FnMut(&str) -> Option<T>,
332 F: FnMut(Vec<T>) -> Fut,
333 Fut: Future<Output = Result<(), APIError>>,
334{
335 let mut response = reqwest::get(url)
336 .await
337 .map_err(|_| APIError::NetworkError)?;
338
339 let mut buffer = String::new();
340 let mut batch: Vec<T> = Vec::with_capacity(batch_size);
341
342 while let Some(chunk) = response.chunk().await.map_err(|_| APIError::NetworkError)? {
343 buffer.push_str(&String::from_utf8_lossy(&chunk));
344
345 while let Some(pos) = buffer.find('\n') {
346 let line = &buffer[..pos];
347 let line = line.trim();
348 if !line.is_empty() {
349 if let Some(item) = parse(line) {
350 batch.push(item);
351 if batch.len() >= batch_size {
352 on_batch(std::mem::take(&mut batch)).await?;
353 batch = Vec::with_capacity(batch_size);
354 }
355 }
356 }
357 buffer = buffer[pos + 1..].to_string();
358 }
359 }
360
361 let remaining = buffer.trim();
362 if !remaining.is_empty() {
363 if let Some(item) = parse(remaining) {
364 batch.push(item);
365 }
366 }
367
368 if !batch.is_empty() {
369 on_batch(batch).await?;
370 }
371
372 Ok(())
373}