Skip to main content

shopify_client/admin/bulk_operation/
mod.rs

1pub mod remote;
2
3use crate::common::ServiceContext;
4
5use std::future::Future;
6use std::sync::Arc;
7
8use crate::{
9    common::types::{APIError, RequestCallbacks},
10    types::{
11        bulk_operation::{
12            CancelResp, CollectionExportLine, CustomerExportLine, DraftOrderExportLine,
13            GetBulkOperationResp, InventoryItemExportLine, ListBulkOperationsParams,
14            ListBulkOperationsResp, OrderExportLine, ProductExportLine, RunMutationResp,
15            RunQueryResp, StagedUploadInput, StagedUploadsCreateResp,
16        },
17        collection::CollectionQueryParams,
18        customer::CustomerQueryParams,
19        draft_order::DraftOrderQueryParams,
20        inventory::InventoryItemQueryParams,
21        order::OrderQueryParams,
22        product::ProductQueryParams,
23    },
24};
25
26pub struct BulkOperation {
27    pub(crate) ctx: ServiceContext,
28}
29
30impl BulkOperation {
31    pub fn new(
32        shop_url: Arc<String>,
33        version: Arc<String>,
34        access_token: Arc<String>,
35        callbacks: Arc<RequestCallbacks>,
36    ) -> Self {
37        Self::with_ctx(ServiceContext::new(
38            shop_url,
39            version,
40            access_token,
41            callbacks,
42        ))
43    }
44
45    /// Build the service from a shared `ServiceContext`. Cheaper than `new` at
46    /// construction sites that already hold a context (one `Arc` clone per service).
47    pub fn with_ctx(ctx: ServiceContext) -> Self {
48        Self { ctx }
49    }
50
51    // region: Raw Operations
52
53    pub async fn run_query(
54        &self,
55        query: &str,
56        group_objects: Option<bool>,
57    ) -> Result<RunQueryResp, APIError> {
58        remote::run_query(&self.ctx, query, group_objects).await
59    }
60
61    pub async fn run_mutation(
62        &self,
63        mutation: &str,
64        staged_upload_path: &str,
65        client_identifier: Option<&str>,
66    ) -> Result<RunMutationResp, APIError> {
67        remote::run_mutation(&self.ctx, mutation, staged_upload_path, client_identifier).await
68    }
69
70    pub async fn cancel(&self, id: &str) -> Result<CancelResp, APIError> {
71        remote::cancel(&self.ctx, id).await
72    }
73
74    pub async fn get(&self, id: &str) -> Result<GetBulkOperationResp, APIError> {
75        remote::get(&self.ctx, id).await
76    }
77
78    pub async fn list(
79        &self,
80        params: &ListBulkOperationsParams,
81    ) -> Result<ListBulkOperationsResp, APIError> {
82        remote::list(&self.ctx, params).await
83    }
84
85    pub async fn create_staged_upload(
86        &self,
87        input: &[StagedUploadInput],
88    ) -> Result<StagedUploadsCreateResp, APIError> {
89        remote::create_staged_upload(&self.ctx, input).await
90    }
91
92    // endregion
93
94    // region: Export Templates
95
96    pub async fn export_products(
97        &self,
98        params: Option<&ProductQueryParams>,
99    ) -> Result<RunQueryResp, APIError> {
100        let filter = params.and_then(|p| p.to_query_string());
101        let query = remote::products_query(filter.as_deref());
102        self.run_query(&query, None).await
103    }
104
105    pub async fn export_orders(
106        &self,
107        params: Option<&OrderQueryParams>,
108    ) -> Result<RunQueryResp, APIError> {
109        let filter = params.and_then(|p| p.to_query_string());
110        let query = remote::orders_query(filter.as_deref());
111        self.run_query(&query, None).await
112    }
113
114    pub async fn export_collections(
115        &self,
116        params: Option<&CollectionQueryParams>,
117    ) -> Result<RunQueryResp, APIError> {
118        let filter = params.and_then(|p| p.to_query_string());
119        let query = remote::collections_query(filter.as_deref());
120        self.run_query(&query, None).await
121    }
122
123    pub async fn export_customers(
124        &self,
125        params: Option<&CustomerQueryParams>,
126    ) -> Result<RunQueryResp, APIError> {
127        let filter = params.and_then(|p| p.to_query_string());
128        let query = remote::customers_query(filter.as_deref());
129        self.run_query(&query, None).await
130    }
131
132    pub async fn export_inventory_items(
133        &self,
134        params: Option<&InventoryItemQueryParams>,
135    ) -> Result<RunQueryResp, APIError> {
136        let filter = params.and_then(|p| p.to_query_string());
137        let query = remote::inventory_items_query(filter.as_deref());
138        self.run_query(&query, None).await
139    }
140
141    pub async fn export_draft_orders(
142        &self,
143        params: Option<&DraftOrderQueryParams>,
144    ) -> Result<RunQueryResp, APIError> {
145        let filter = params.and_then(|p| p.to_query_string());
146        let query = remote::draft_orders_query(filter.as_deref());
147        self.run_query(&query, None).await
148    }
149
150    // endregion
151
152    // region: Streaming JSONL
153
154    pub async fn stream_jsonl<F, Fut>(
155        &self,
156        url: &str,
157        batch_size: usize,
158        on_batch: F,
159    ) -> Result<(), APIError>
160    where
161        F: FnMut(Vec<String>) -> Fut,
162        Fut: Future<Output = Result<(), APIError>>,
163    {
164        download_and_batch(url, batch_size, |line| Some(line.to_string()), on_batch).await
165    }
166
167    pub async fn stream_products<F, Fut>(
168        &self,
169        url: &str,
170        batch_size: usize,
171        on_batch: F,
172    ) -> Result<(), APIError>
173    where
174        F: FnMut(Vec<ProductExportLine>) -> Fut,
175        Fut: Future<Output = Result<(), APIError>>,
176    {
177        download_and_batch(
178            url,
179            batch_size,
180            |line| ProductExportLine::parse_line(line).ok(),
181            on_batch,
182        )
183        .await
184    }
185
186    pub async fn stream_orders<F, Fut>(
187        &self,
188        url: &str,
189        batch_size: usize,
190        on_batch: F,
191    ) -> Result<(), APIError>
192    where
193        F: FnMut(Vec<OrderExportLine>) -> Fut,
194        Fut: Future<Output = Result<(), APIError>>,
195    {
196        download_and_batch(
197            url,
198            batch_size,
199            |line| OrderExportLine::parse_line(line).ok(),
200            on_batch,
201        )
202        .await
203    }
204
205    pub async fn stream_collections<F, Fut>(
206        &self,
207        url: &str,
208        batch_size: usize,
209        on_batch: F,
210    ) -> Result<(), APIError>
211    where
212        F: FnMut(Vec<CollectionExportLine>) -> Fut,
213        Fut: Future<Output = Result<(), APIError>>,
214    {
215        download_and_batch(
216            url,
217            batch_size,
218            |line| CollectionExportLine::parse_line(line).ok(),
219            on_batch,
220        )
221        .await
222    }
223
224    pub async fn stream_customers<F, Fut>(
225        &self,
226        url: &str,
227        batch_size: usize,
228        on_batch: F,
229    ) -> Result<(), APIError>
230    where
231        F: FnMut(Vec<CustomerExportLine>) -> Fut,
232        Fut: Future<Output = Result<(), APIError>>,
233    {
234        download_and_batch(
235            url,
236            batch_size,
237            |line| CustomerExportLine::parse_line(line).ok(),
238            on_batch,
239        )
240        .await
241    }
242
243    pub async fn stream_inventory_items<F, Fut>(
244        &self,
245        url: &str,
246        batch_size: usize,
247        on_batch: F,
248    ) -> Result<(), APIError>
249    where
250        F: FnMut(Vec<InventoryItemExportLine>) -> Fut,
251        Fut: Future<Output = Result<(), APIError>>,
252    {
253        download_and_batch(
254            url,
255            batch_size,
256            |line| InventoryItemExportLine::parse_line(line).ok(),
257            on_batch,
258        )
259        .await
260    }
261
262    pub async fn stream_draft_orders<F, Fut>(
263        &self,
264        url: &str,
265        batch_size: usize,
266        on_batch: F,
267    ) -> Result<(), APIError>
268    where
269        F: FnMut(Vec<DraftOrderExportLine>) -> Fut,
270        Fut: Future<Output = Result<(), APIError>>,
271    {
272        download_and_batch(
273            url,
274            batch_size,
275            |line| DraftOrderExportLine::parse_line(line).ok(),
276            on_batch,
277        )
278        .await
279    }
280
281    // endregion
282}
283
284async fn download_and_batch<T, P, F, Fut>(
285    url: &str,
286    batch_size: usize,
287    mut parse: P,
288    mut on_batch: F,
289) -> Result<(), APIError>
290where
291    P: FnMut(&str) -> Option<T>,
292    F: FnMut(Vec<T>) -> Fut,
293    Fut: Future<Output = Result<(), APIError>>,
294{
295    let mut response = reqwest::get(url)
296        .await
297        .map_err(|_| APIError::NetworkError)?;
298
299    let mut buffer = String::new();
300    let mut batch: Vec<T> = Vec::with_capacity(batch_size);
301
302    while let Some(chunk) = response.chunk().await.map_err(|_| APIError::NetworkError)? {
303        buffer.push_str(&String::from_utf8_lossy(&chunk));
304
305        while let Some(pos) = buffer.find('\n') {
306            let line = &buffer[..pos];
307            let line = line.trim();
308            if !line.is_empty() {
309                if let Some(item) = parse(line) {
310                    batch.push(item);
311                    if batch.len() >= batch_size {
312                        on_batch(std::mem::take(&mut batch)).await?;
313                        batch = Vec::with_capacity(batch_size);
314                    }
315                }
316            }
317            buffer = buffer[pos + 1..].to_string();
318        }
319    }
320
321    let remaining = buffer.trim();
322    if !remaining.is_empty() {
323        if let Some(item) = parse(remaining) {
324            batch.push(item);
325        }
326    }
327
328    if !batch.is_empty() {
329        on_batch(batch).await?;
330    }
331
332    Ok(())
333}