1pub mod remote;
2
3use crate::common::ServiceContext;
4
5use std::future::Future;
6use std::sync::Arc;
7
8use crate::{
9 common::types::{APIError, RequestCallbacks},
10 types::{
11 bulk_operation::{
12 CancelResp, CollectionExportLine, CustomerExportLine, DraftOrderExportLine,
13 GetBulkOperationResp, InventoryItemExportLine, ListBulkOperationsParams,
14 ListBulkOperationsResp, OrderExportLine, ProductExportLine, RunMutationResp,
15 RunQueryResp, StagedUploadInput, StagedUploadsCreateResp,
16 },
17 collection::CollectionQueryParams,
18 customer::CustomerQueryParams,
19 draft_order::DraftOrderQueryParams,
20 inventory::InventoryItemQueryParams,
21 order::OrderQueryParams,
22 product::ProductQueryParams,
23 },
24};
25
26pub struct BulkOperation {
27 pub(crate) ctx: ServiceContext,
28}
29
30impl BulkOperation {
31 pub fn new(
32 shop_url: Arc<String>,
33 version: Arc<String>,
34 access_token: Arc<String>,
35 callbacks: Arc<RequestCallbacks>,
36 ) -> Self {
37 Self::with_ctx(ServiceContext::new(
38 shop_url,
39 version,
40 access_token,
41 callbacks,
42 ))
43 }
44
45 pub fn with_ctx(ctx: ServiceContext) -> Self {
48 Self { ctx }
49 }
50
51 pub async fn run_query(
54 &self,
55 query: &str,
56 group_objects: Option<bool>,
57 ) -> Result<RunQueryResp, APIError> {
58 remote::run_query(&self.ctx, query, group_objects).await
59 }
60
61 pub async fn run_mutation(
62 &self,
63 mutation: &str,
64 staged_upload_path: &str,
65 client_identifier: Option<&str>,
66 ) -> Result<RunMutationResp, APIError> {
67 remote::run_mutation(&self.ctx, mutation, staged_upload_path, client_identifier).await
68 }
69
70 pub async fn cancel(&self, id: &str) -> Result<CancelResp, APIError> {
71 remote::cancel(&self.ctx, id).await
72 }
73
74 pub async fn get(&self, id: &str) -> Result<GetBulkOperationResp, APIError> {
75 remote::get(&self.ctx, id).await
76 }
77
78 pub async fn list(
79 &self,
80 params: &ListBulkOperationsParams,
81 ) -> Result<ListBulkOperationsResp, APIError> {
82 remote::list(&self.ctx, params).await
83 }
84
85 pub async fn create_staged_upload(
86 &self,
87 input: &[StagedUploadInput],
88 ) -> Result<StagedUploadsCreateResp, APIError> {
89 remote::create_staged_upload(&self.ctx, input).await
90 }
91
92 pub async fn export_products(
97 &self,
98 params: Option<&ProductQueryParams>,
99 ) -> Result<RunQueryResp, APIError> {
100 let filter = params.and_then(|p| p.to_query_string());
101 let query = remote::products_query(filter.as_deref());
102 self.run_query(&query, None).await
103 }
104
105 pub async fn export_orders(
106 &self,
107 params: Option<&OrderQueryParams>,
108 ) -> Result<RunQueryResp, APIError> {
109 let filter = params.and_then(|p| p.to_query_string());
110 let query = remote::orders_query(filter.as_deref());
111 self.run_query(&query, None).await
112 }
113
114 pub async fn export_collections(
115 &self,
116 params: Option<&CollectionQueryParams>,
117 ) -> Result<RunQueryResp, APIError> {
118 let filter = params.and_then(|p| p.to_query_string());
119 let query = remote::collections_query(filter.as_deref());
120 self.run_query(&query, None).await
121 }
122
123 pub async fn export_customers(
124 &self,
125 params: Option<&CustomerQueryParams>,
126 ) -> Result<RunQueryResp, APIError> {
127 let filter = params.and_then(|p| p.to_query_string());
128 let query = remote::customers_query(filter.as_deref());
129 self.run_query(&query, None).await
130 }
131
132 pub async fn export_inventory_items(
133 &self,
134 params: Option<&InventoryItemQueryParams>,
135 ) -> Result<RunQueryResp, APIError> {
136 let filter = params.and_then(|p| p.to_query_string());
137 let query = remote::inventory_items_query(filter.as_deref());
138 self.run_query(&query, None).await
139 }
140
141 pub async fn export_draft_orders(
142 &self,
143 params: Option<&DraftOrderQueryParams>,
144 ) -> Result<RunQueryResp, APIError> {
145 let filter = params.and_then(|p| p.to_query_string());
146 let query = remote::draft_orders_query(filter.as_deref());
147 self.run_query(&query, None).await
148 }
149
150 pub async fn stream_jsonl<F, Fut>(
155 &self,
156 url: &str,
157 batch_size: usize,
158 on_batch: F,
159 ) -> Result<(), APIError>
160 where
161 F: FnMut(Vec<String>) -> Fut,
162 Fut: Future<Output = Result<(), APIError>>,
163 {
164 download_and_batch(url, batch_size, |line| Some(line.to_string()), on_batch).await
165 }
166
167 pub async fn stream_products<F, Fut>(
168 &self,
169 url: &str,
170 batch_size: usize,
171 on_batch: F,
172 ) -> Result<(), APIError>
173 where
174 F: FnMut(Vec<ProductExportLine>) -> Fut,
175 Fut: Future<Output = Result<(), APIError>>,
176 {
177 download_and_batch(
178 url,
179 batch_size,
180 |line| ProductExportLine::parse_line(line).ok(),
181 on_batch,
182 )
183 .await
184 }
185
186 pub async fn stream_orders<F, Fut>(
187 &self,
188 url: &str,
189 batch_size: usize,
190 on_batch: F,
191 ) -> Result<(), APIError>
192 where
193 F: FnMut(Vec<OrderExportLine>) -> Fut,
194 Fut: Future<Output = Result<(), APIError>>,
195 {
196 download_and_batch(
197 url,
198 batch_size,
199 |line| OrderExportLine::parse_line(line).ok(),
200 on_batch,
201 )
202 .await
203 }
204
205 pub async fn stream_collections<F, Fut>(
206 &self,
207 url: &str,
208 batch_size: usize,
209 on_batch: F,
210 ) -> Result<(), APIError>
211 where
212 F: FnMut(Vec<CollectionExportLine>) -> Fut,
213 Fut: Future<Output = Result<(), APIError>>,
214 {
215 download_and_batch(
216 url,
217 batch_size,
218 |line| CollectionExportLine::parse_line(line).ok(),
219 on_batch,
220 )
221 .await
222 }
223
224 pub async fn stream_customers<F, Fut>(
225 &self,
226 url: &str,
227 batch_size: usize,
228 on_batch: F,
229 ) -> Result<(), APIError>
230 where
231 F: FnMut(Vec<CustomerExportLine>) -> Fut,
232 Fut: Future<Output = Result<(), APIError>>,
233 {
234 download_and_batch(
235 url,
236 batch_size,
237 |line| CustomerExportLine::parse_line(line).ok(),
238 on_batch,
239 )
240 .await
241 }
242
243 pub async fn stream_inventory_items<F, Fut>(
244 &self,
245 url: &str,
246 batch_size: usize,
247 on_batch: F,
248 ) -> Result<(), APIError>
249 where
250 F: FnMut(Vec<InventoryItemExportLine>) -> Fut,
251 Fut: Future<Output = Result<(), APIError>>,
252 {
253 download_and_batch(
254 url,
255 batch_size,
256 |line| InventoryItemExportLine::parse_line(line).ok(),
257 on_batch,
258 )
259 .await
260 }
261
262 pub async fn stream_draft_orders<F, Fut>(
263 &self,
264 url: &str,
265 batch_size: usize,
266 on_batch: F,
267 ) -> Result<(), APIError>
268 where
269 F: FnMut(Vec<DraftOrderExportLine>) -> Fut,
270 Fut: Future<Output = Result<(), APIError>>,
271 {
272 download_and_batch(
273 url,
274 batch_size,
275 |line| DraftOrderExportLine::parse_line(line).ok(),
276 on_batch,
277 )
278 .await
279 }
280
281 }
283
284async fn download_and_batch<T, P, F, Fut>(
285 url: &str,
286 batch_size: usize,
287 mut parse: P,
288 mut on_batch: F,
289) -> Result<(), APIError>
290where
291 P: FnMut(&str) -> Option<T>,
292 F: FnMut(Vec<T>) -> Fut,
293 Fut: Future<Output = Result<(), APIError>>,
294{
295 let mut response = reqwest::get(url)
296 .await
297 .map_err(|_| APIError::NetworkError)?;
298
299 let mut buffer = String::new();
300 let mut batch: Vec<T> = Vec::with_capacity(batch_size);
301
302 while let Some(chunk) = response.chunk().await.map_err(|_| APIError::NetworkError)? {
303 buffer.push_str(&String::from_utf8_lossy(&chunk));
304
305 while let Some(pos) = buffer.find('\n') {
306 let line = &buffer[..pos];
307 let line = line.trim();
308 if !line.is_empty() {
309 if let Some(item) = parse(line) {
310 batch.push(item);
311 if batch.len() >= batch_size {
312 on_batch(std::mem::take(&mut batch)).await?;
313 batch = Vec::with_capacity(batch_size);
314 }
315 }
316 }
317 buffer = buffer[pos + 1..].to_string();
318 }
319 }
320
321 let remaining = buffer.trim();
322 if !remaining.is_empty() {
323 if let Some(item) = parse(remaining) {
324 batch.push(item);
325 }
326 }
327
328 if !batch.is_empty() {
329 on_batch(batch).await?;
330 }
331
332 Ok(())
333}