Skip to main content

shopify_client/services/bulk_operation/
mod.rs

1pub mod remote;
2
3use std::future::Future;
4use std::sync::Arc;
5
6use crate::{
7    common::types::{APIError, RequestCallbacks},
8    types::{
9        bulk_operation::{
10            CancelResp, CollectionExportLine, CustomerExportLine, DraftOrderExportLine,
11            GetBulkOperationResp, InventoryItemExportLine, ListBulkOperationsParams,
12            ListBulkOperationsResp, OrderExportLine, ProductExportLine, RunMutationResp,
13            RunQueryResp, StagedUploadInput, StagedUploadsCreateResp,
14        },
15        collection::CollectionQueryParams,
16        customer::CustomerQueryParams,
17        draft_order::DraftOrderQueryParams,
18        inventory::InventoryItemQueryParams,
19        order::OrderQueryParams,
20        product::ProductQueryParams,
21    },
22};
23
24pub struct BulkOperation {
25    pub shop_url: Arc<String>,
26    pub version: Arc<String>,
27    pub access_token: Arc<String>,
28    pub callbacks: Arc<RequestCallbacks>,
29}
30
31impl BulkOperation {
32    pub fn new(
33        shop_url: Arc<String>,
34        version: Arc<String>,
35        access_token: Arc<String>,
36        callbacks: Arc<RequestCallbacks>,
37    ) -> Self {
38        BulkOperation {
39            shop_url,
40            version,
41            access_token,
42            callbacks,
43        }
44    }
45
46    // region: Raw Operations
47
48    pub async fn run_query(
49        &self,
50        query: &str,
51        group_objects: Option<bool>,
52    ) -> Result<RunQueryResp, APIError> {
53        remote::run_query(
54            &self.shop_url,
55            &self.version,
56            &self.access_token,
57            &self.callbacks,
58            query,
59            group_objects,
60        )
61        .await
62    }
63
64    pub async fn run_mutation(
65        &self,
66        mutation: &str,
67        staged_upload_path: &str,
68        client_identifier: Option<&str>,
69    ) -> Result<RunMutationResp, APIError> {
70        remote::run_mutation(
71            &self.shop_url,
72            &self.version,
73            &self.access_token,
74            &self.callbacks,
75            mutation,
76            staged_upload_path,
77            client_identifier,
78        )
79        .await
80    }
81
82    pub async fn cancel(&self, id: &str) -> Result<CancelResp, APIError> {
83        remote::cancel(
84            &self.shop_url,
85            &self.version,
86            &self.access_token,
87            &self.callbacks,
88            id,
89        )
90        .await
91    }
92
93    pub async fn get(&self, id: &str) -> Result<GetBulkOperationResp, APIError> {
94        remote::get(
95            &self.shop_url,
96            &self.version,
97            &self.access_token,
98            &self.callbacks,
99            id,
100        )
101        .await
102    }
103
104    pub async fn list(
105        &self,
106        params: &ListBulkOperationsParams,
107    ) -> Result<ListBulkOperationsResp, APIError> {
108        remote::list(
109            &self.shop_url,
110            &self.version,
111            &self.access_token,
112            &self.callbacks,
113            params,
114        )
115        .await
116    }
117
118    pub async fn create_staged_upload(
119        &self,
120        input: &[StagedUploadInput],
121    ) -> Result<StagedUploadsCreateResp, APIError> {
122        remote::create_staged_upload(
123            &self.shop_url,
124            &self.version,
125            &self.access_token,
126            &self.callbacks,
127            input,
128        )
129        .await
130    }
131
132    // endregion
133
134    // region: Export Templates
135
136    pub async fn export_products(
137        &self,
138        params: Option<&ProductQueryParams>,
139    ) -> Result<RunQueryResp, APIError> {
140        let filter = params.and_then(|p| p.to_query_string());
141        let query = remote::products_query(filter.as_deref());
142        self.run_query(&query, None).await
143    }
144
145    pub async fn export_orders(
146        &self,
147        params: Option<&OrderQueryParams>,
148    ) -> Result<RunQueryResp, APIError> {
149        let filter = params.and_then(|p| p.to_query_string());
150        let query = remote::orders_query(filter.as_deref());
151        self.run_query(&query, None).await
152    }
153
154    pub async fn export_collections(
155        &self,
156        params: Option<&CollectionQueryParams>,
157    ) -> Result<RunQueryResp, APIError> {
158        let filter = params.and_then(|p| p.to_query_string());
159        let query = remote::collections_query(filter.as_deref());
160        self.run_query(&query, None).await
161    }
162
163    pub async fn export_customers(
164        &self,
165        params: Option<&CustomerQueryParams>,
166    ) -> Result<RunQueryResp, APIError> {
167        let filter = params.and_then(|p| p.to_query_string());
168        let query = remote::customers_query(filter.as_deref());
169        self.run_query(&query, None).await
170    }
171
172    pub async fn export_inventory_items(
173        &self,
174        params: Option<&InventoryItemQueryParams>,
175    ) -> Result<RunQueryResp, APIError> {
176        let filter = params.and_then(|p| p.to_query_string());
177        let query = remote::inventory_items_query(filter.as_deref());
178        self.run_query(&query, None).await
179    }
180
181    pub async fn export_draft_orders(
182        &self,
183        params: Option<&DraftOrderQueryParams>,
184    ) -> Result<RunQueryResp, APIError> {
185        let filter = params.and_then(|p| p.to_query_string());
186        let query = remote::draft_orders_query(filter.as_deref());
187        self.run_query(&query, None).await
188    }
189
190    // endregion
191
192    // region: Streaming JSONL
193
194    pub async fn stream_jsonl<F, Fut>(
195        &self,
196        url: &str,
197        batch_size: usize,
198        on_batch: F,
199    ) -> Result<(), APIError>
200    where
201        F: FnMut(Vec<String>) -> Fut,
202        Fut: Future<Output = Result<(), APIError>>,
203    {
204        download_and_batch(url, batch_size, |line| Some(line.to_string()), on_batch).await
205    }
206
207    pub async fn stream_products<F, Fut>(
208        &self,
209        url: &str,
210        batch_size: usize,
211        on_batch: F,
212    ) -> Result<(), APIError>
213    where
214        F: FnMut(Vec<ProductExportLine>) -> Fut,
215        Fut: Future<Output = Result<(), APIError>>,
216    {
217        download_and_batch(
218            url,
219            batch_size,
220            |line| ProductExportLine::parse_line(line).ok(),
221            on_batch,
222        )
223        .await
224    }
225
226    pub async fn stream_orders<F, Fut>(
227        &self,
228        url: &str,
229        batch_size: usize,
230        on_batch: F,
231    ) -> Result<(), APIError>
232    where
233        F: FnMut(Vec<OrderExportLine>) -> Fut,
234        Fut: Future<Output = Result<(), APIError>>,
235    {
236        download_and_batch(
237            url,
238            batch_size,
239            |line| OrderExportLine::parse_line(line).ok(),
240            on_batch,
241        )
242        .await
243    }
244
245    pub async fn stream_collections<F, Fut>(
246        &self,
247        url: &str,
248        batch_size: usize,
249        on_batch: F,
250    ) -> Result<(), APIError>
251    where
252        F: FnMut(Vec<CollectionExportLine>) -> Fut,
253        Fut: Future<Output = Result<(), APIError>>,
254    {
255        download_and_batch(
256            url,
257            batch_size,
258            |line| CollectionExportLine::parse_line(line).ok(),
259            on_batch,
260        )
261        .await
262    }
263
264    pub async fn stream_customers<F, Fut>(
265        &self,
266        url: &str,
267        batch_size: usize,
268        on_batch: F,
269    ) -> Result<(), APIError>
270    where
271        F: FnMut(Vec<CustomerExportLine>) -> Fut,
272        Fut: Future<Output = Result<(), APIError>>,
273    {
274        download_and_batch(
275            url,
276            batch_size,
277            |line| CustomerExportLine::parse_line(line).ok(),
278            on_batch,
279        )
280        .await
281    }
282
283    pub async fn stream_inventory_items<F, Fut>(
284        &self,
285        url: &str,
286        batch_size: usize,
287        on_batch: F,
288    ) -> Result<(), APIError>
289    where
290        F: FnMut(Vec<InventoryItemExportLine>) -> Fut,
291        Fut: Future<Output = Result<(), APIError>>,
292    {
293        download_and_batch(
294            url,
295            batch_size,
296            |line| InventoryItemExportLine::parse_line(line).ok(),
297            on_batch,
298        )
299        .await
300    }
301
302    pub async fn stream_draft_orders<F, Fut>(
303        &self,
304        url: &str,
305        batch_size: usize,
306        on_batch: F,
307    ) -> Result<(), APIError>
308    where
309        F: FnMut(Vec<DraftOrderExportLine>) -> Fut,
310        Fut: Future<Output = Result<(), APIError>>,
311    {
312        download_and_batch(
313            url,
314            batch_size,
315            |line| DraftOrderExportLine::parse_line(line).ok(),
316            on_batch,
317        )
318        .await
319    }
320
321    // endregion
322}
323
324async fn download_and_batch<T, P, F, Fut>(
325    url: &str,
326    batch_size: usize,
327    mut parse: P,
328    mut on_batch: F,
329) -> Result<(), APIError>
330where
331    P: FnMut(&str) -> Option<T>,
332    F: FnMut(Vec<T>) -> Fut,
333    Fut: Future<Output = Result<(), APIError>>,
334{
335    let mut response = reqwest::get(url)
336        .await
337        .map_err(|_| APIError::NetworkError)?;
338
339    let mut buffer = String::new();
340    let mut batch: Vec<T> = Vec::with_capacity(batch_size);
341
342    while let Some(chunk) = response.chunk().await.map_err(|_| APIError::NetworkError)? {
343        buffer.push_str(&String::from_utf8_lossy(&chunk));
344
345        while let Some(pos) = buffer.find('\n') {
346            let line = &buffer[..pos];
347            let line = line.trim();
348            if !line.is_empty() {
349                if let Some(item) = parse(line) {
350                    batch.push(item);
351                    if batch.len() >= batch_size {
352                        on_batch(std::mem::take(&mut batch)).await?;
353                        batch = Vec::with_capacity(batch_size);
354                    }
355                }
356            }
357            buffer = buffer[pos + 1..].to_string();
358        }
359    }
360
361    let remaining = buffer.trim();
362    if !remaining.is_empty() {
363        if let Some(item) = parse(remaining) {
364            batch.push(item);
365        }
366    }
367
368    if !batch.is_empty() {
369        on_batch(batch).await?;
370    }
371
372    Ok(())
373}