Skip to main content

scrapfly_sdk/
schedule.rs

1//! Public schedule client — wraps `/scrape/schedules`, `/screenshot/schedules`,
2//! `/crawl/schedules` and the cross-kind `/schedules` endpoints.
3//!
4//! Mirrors the Go and Python SDKs: the kind-specific configuration is supplied
5//! to the matching `create_*_schedule` method; cross-kind list/get/update/
6//! delete/pause/resume/execute work on any schedule by id.
7
8use std::collections::HashMap;
9
10use reqwest::Method;
11use serde::{Deserialize, Serialize};
12use serde_json::Value;
13
14use crate::client::Client;
15use crate::error::{from_response, ScrapflyError};
16
17/// Bounds a recurring schedule by either a date or a fire count.
18#[derive(Debug, Clone, Serialize, Deserialize, Default)]
19pub struct ScheduleEnd {
20    /// End mode: `"date"` (stop at `date`) or `"count"` (stop after `count` fires).
21    #[serde(rename = "type")]
22    pub kind: String,
23    /// ISO8601 datetime at which the schedule stops firing (when `kind == "date"`).
24    #[serde(skip_serializing_if = "Option::is_none")]
25    pub date: Option<String>,
26    /// Number of remaining fires before the schedule stops (when `kind == "count"`).
27    #[serde(skip_serializing_if = "Option::is_none")]
28    pub count: Option<i64>,
29}
30
31/// When a schedule fires next. Cron mode wins when `cron` is set; otherwise
32/// `interval` + `unit` drive the cadence.
33#[derive(Debug, Clone, Serialize, Deserialize, Default)]
34pub struct ScheduleRecurrence {
35    /// Cron expression evaluated in UTC. Wins over `interval`/`unit` when set.
36    #[serde(skip_serializing_if = "Option::is_none")]
37    pub cron: Option<String>,
38    /// Numeric component of the cadence (e.g. `5` for "every 5 minutes").
39    #[serde(skip_serializing_if = "Option::is_none")]
40    pub interval: Option<i64>,
41    /// Cadence unit: `"minute" | "hour" | "day" | "week" | "month"`.
42    #[serde(skip_serializing_if = "Option::is_none")]
43    pub unit: Option<String>,
44    /// Optional weekday filter (e.g. `["mon", "wed", "fri"]`).
45    #[serde(skip_serializing_if = "Option::is_none")]
46    pub days: Option<Vec<String>>,
47    /// Optional end-of-recurrence condition.
48    #[serde(skip_serializing_if = "Option::is_none")]
49    pub ends: Option<ScheduleEnd>,
50}
51
52/// Public-facing request envelope for creating a schedule. The kind-specific
53/// config is supplied as a separate argument.
54#[derive(Debug, Clone, Serialize, Default)]
55pub struct CreateScheduleRequest {
56    /// Name of the webhook to deliver each fire's result to.
57    pub webhook_name: String,
58    /// Recurring cadence. Mutually exclusive with `scheduled_date` (one-shot).
59    #[serde(skip_serializing_if = "Option::is_none")]
60    pub recurrence: Option<ScheduleRecurrence>,
61    /// One-shot fire datetime (ISO8601). Mutually exclusive with `recurrence`.
62    #[serde(skip_serializing_if = "Option::is_none")]
63    pub scheduled_date: Option<String>,
64    /// When true, multiple fires of the same schedule may run concurrently.
65    pub allow_concurrency: bool,
66    /// When true, failed fires are retried up to `max_retries` times.
67    pub retry_on_failure: bool,
68    /// Cap on retries per fire when `retry_on_failure` is set.
69    #[serde(skip_serializing_if = "Option::is_none")]
70    pub max_retries: Option<i64>,
71    /// Free-form description shown on the dashboard.
72    #[serde(skip_serializing_if = "Option::is_none")]
73    pub notes: Option<String>,
74}
75
76/// PATCH payload. Only fields explicitly set are forwarded.
77#[derive(Debug, Clone, Serialize, Default)]
78pub struct UpdateScheduleRequest {
79    /// Replace the recurrence cadence.
80    #[serde(skip_serializing_if = "Option::is_none")]
81    pub recurrence: Option<ScheduleRecurrence>,
82    /// Replace the one-shot fire datetime.
83    #[serde(skip_serializing_if = "Option::is_none")]
84    pub scheduled_date: Option<String>,
85    /// Replace the concurrency flag.
86    #[serde(skip_serializing_if = "Option::is_none")]
87    pub allow_concurrency: Option<bool>,
88    /// Replace the retry-on-failure flag.
89    #[serde(skip_serializing_if = "Option::is_none")]
90    pub retry_on_failure: Option<bool>,
91    /// Replace the per-fire retry cap.
92    #[serde(skip_serializing_if = "Option::is_none")]
93    pub max_retries: Option<i64>,
94    /// Replace the dashboard notes.
95    #[serde(skip_serializing_if = "Option::is_none")]
96    pub notes: Option<String>,
97    /// Replace the scrape config (only valid for scrape schedules).
98    #[serde(skip_serializing_if = "Option::is_none")]
99    pub scrape_config: Option<HashMap<String, Value>>,
100    /// Replace the screenshot config (only valid for screenshot schedules).
101    #[serde(skip_serializing_if = "Option::is_none")]
102    pub screenshot_config: Option<HashMap<String, Value>>,
103    /// Replace the crawler config (only valid for crawler schedules).
104    #[serde(skip_serializing_if = "Option::is_none")]
105    pub crawler_config: Option<HashMap<String, Value>>,
106}
107
108/// Server-side schedule record. Returned by every read or mutation endpoint.
109#[derive(Debug, Clone, Deserialize)]
110pub struct Schedule {
111    /// Server-issued schedule UUID.
112    pub id: String,
113    /// Kind of the underlying job: `"scrape" | "screenshot" | "crawler"`.
114    pub kind: String,
115    /// Lifecycle status: `"ACTIVE" | "PAUSED" | "CANCELLED"`.
116    pub status: String,
117    /// ISO8601 datetime of the next planned fire (recurring schedules).
118    #[serde(default)]
119    pub next_scheduled_date: Option<String>,
120    /// ISO8601 datetime of the one-shot fire (one-shot schedules).
121    #[serde(default)]
122    pub scheduled_date: Option<String>,
123    /// Recurrence cadence for recurring schedules.
124    #[serde(default)]
125    pub recurrence: Option<ScheduleRecurrence>,
126    /// Free-form server-side metadata bag.
127    #[serde(default)]
128    pub metadata: Option<HashMap<String, Value>>,
129    /// Free-form notes attached at create / update time.
130    #[serde(default)]
131    pub notes: Option<String>,
132    /// User UUID that authored the schedule.
133    #[serde(default)]
134    pub created_by: Option<String>,
135    /// ISO8601 datetime of creation.
136    pub created_at: String,
137    /// ISO8601 datetime of the last update.
138    pub updated_at: String,
139    /// ISO8601 datetime of cancellation (set when `status == "CANCELLED"`).
140    #[serde(default)]
141    pub cancelled_at: Option<String>,
142    /// Whether overlapping fires are permitted.
143    pub allow_concurrency: bool,
144    /// Whether failed fires are retried.
145    pub retry_on_failure: bool,
146    /// Per-fire retry cap when `retry_on_failure` is set.
147    pub max_retries: i64,
148    /// UUID of the webhook receiving each fire's result.
149    #[serde(default)]
150    pub webhook_uuid: Option<String>,
151    /// UUID of the owning user.
152    #[serde(default)]
153    pub user_uuid: Option<String>,
154    /// Consecutive failure counter (auto-cancels after a server-defined cap).
155    #[serde(default)]
156    pub consecutive_failures: Option<i64>,
157}
158
159/// Optional filters for [`Client::list_schedules`].
160#[derive(Debug, Clone, Default)]
161pub struct ListSchedulesOptions {
162    /// Restrict to a single status (`"ACTIVE" | "PAUSED" | "CANCELLED"`).
163    pub status: Option<String>,
164    /// Restrict to a single kind (`"scrape" | "screenshot" | "crawler"`).
165    pub kind: Option<String>,
166}
167
168impl Client {
169    /// Create a Web Scraping API schedule.
170    pub async fn create_scrape_schedule(
171        &self,
172        scrape_config: HashMap<String, Value>,
173        request: &CreateScheduleRequest,
174    ) -> Result<Schedule, ScrapflyError> {
175        self.create_schedule_inner("/scrape/schedules", "scrape_config", scrape_config, request)
176            .await
177    }
178
179    /// Create a Screenshot API schedule.
180    pub async fn create_screenshot_schedule(
181        &self,
182        screenshot_config: HashMap<String, Value>,
183        request: &CreateScheduleRequest,
184    ) -> Result<Schedule, ScrapflyError> {
185        self.create_schedule_inner(
186            "/screenshot/schedules",
187            "screenshot_config",
188            screenshot_config,
189            request,
190        )
191        .await
192    }
193
194    /// Create a Crawler API schedule.
195    pub async fn create_crawler_schedule(
196        &self,
197        crawler_config: HashMap<String, Value>,
198        request: &CreateScheduleRequest,
199    ) -> Result<Schedule, ScrapflyError> {
200        self.create_schedule_inner(
201            "/crawl/schedules",
202            "crawler_config",
203            crawler_config,
204            request,
205        )
206        .await
207    }
208
209    /// Return a schedule by id (works across all kinds).
210    pub async fn get_schedule(&self, id: &str) -> Result<Schedule, ScrapflyError> {
211        let path = format!("/schedules/{}", url_path_escape(id));
212        self.schedule_request_json::<Schedule>(Method::GET, &path, &[], None)
213            .await
214    }
215
216    /// List every schedule on the account, optionally filtered by kind/status.
217    pub async fn list_schedules(
218        &self,
219        opts: Option<&ListSchedulesOptions>,
220    ) -> Result<Vec<Schedule>, ScrapflyError> {
221        self.list_schedules_inner("/schedules", opts).await
222    }
223
224    /// List scrape schedules, optionally filtered by `status`.
225    pub async fn list_scrape_schedules(
226        &self,
227        status: Option<&str>,
228    ) -> Result<Vec<Schedule>, ScrapflyError> {
229        self.list_schedules_inner(
230            "/scrape/schedules",
231            status
232                .map(|s| ListSchedulesOptions {
233                    status: Some(s.into()),
234                    kind: None,
235                })
236                .as_ref(),
237        )
238        .await
239    }
240
241    /// List screenshot schedules, optionally filtered by `status`.
242    pub async fn list_screenshot_schedules(
243        &self,
244        status: Option<&str>,
245    ) -> Result<Vec<Schedule>, ScrapflyError> {
246        self.list_schedules_inner(
247            "/screenshot/schedules",
248            status
249                .map(|s| ListSchedulesOptions {
250                    status: Some(s.into()),
251                    kind: None,
252                })
253                .as_ref(),
254        )
255        .await
256    }
257
258    /// List crawler schedules, optionally filtered by `status`.
259    pub async fn list_crawler_schedules(
260        &self,
261        status: Option<&str>,
262    ) -> Result<Vec<Schedule>, ScrapflyError> {
263        self.list_schedules_inner(
264            "/crawl/schedules",
265            status
266                .map(|s| ListSchedulesOptions {
267                    status: Some(s.into()),
268                    kind: None,
269                })
270                .as_ref(),
271        )
272        .await
273    }
274
275    /// Patch an active schedule. Only fields set in `request` change.
276    pub async fn update_schedule(
277        &self,
278        id: &str,
279        request: &UpdateScheduleRequest,
280    ) -> Result<Schedule, ScrapflyError> {
281        let path = format!("/schedules/{}", url_path_escape(id));
282        let body = serde_json::to_vec(request)?;
283        self.schedule_request_json::<Schedule>(Method::PATCH, &path, &[], Some(body))
284            .await
285    }
286
287    /// Cancel a schedule. Cancellation is terminal (returns no body).
288    pub async fn cancel_schedule(&self, id: &str) -> Result<(), ScrapflyError> {
289        let path = format!("/schedules/{}", url_path_escape(id));
290        self.schedule_request_empty(Method::DELETE, &path, &[], None)
291            .await
292    }
293
294    /// Pause an active schedule. Idempotent on already-paused schedules.
295    pub async fn pause_schedule(&self, id: &str) -> Result<Schedule, ScrapflyError> {
296        let path = format!("/schedules/{}/pause", url_path_escape(id));
297        self.schedule_request_json::<Schedule>(Method::POST, &path, &[], None)
298            .await
299    }
300
301    /// Resume a paused schedule. Idempotent on already-active schedules.
302    pub async fn resume_schedule(&self, id: &str) -> Result<Schedule, ScrapflyError> {
303        let path = format!("/schedules/{}/resume", url_path_escape(id));
304        self.schedule_request_json::<Schedule>(Method::POST, &path, &[], None)
305            .await
306    }
307
308    /// Fire a schedule immediately, regardless of `next_scheduled_date`.
309    pub async fn execute_schedule(&self, id: &str) -> Result<Schedule, ScrapflyError> {
310        let path = format!("/schedules/{}/execute", url_path_escape(id));
311        self.schedule_request_json::<Schedule>(Method::POST, &path, &[], None)
312            .await
313    }
314
315    // ---- internals ------------------------------------------------------
316
317    async fn create_schedule_inner(
318        &self,
319        path: &str,
320        config_key: &'static str,
321        config: HashMap<String, Value>,
322        request: &CreateScheduleRequest,
323    ) -> Result<Schedule, ScrapflyError> {
324        let mut body = serde_json::Map::new();
325        body.insert(config_key.to_string(), Value::Object(map_to_object(config)));
326        body.insert(
327            "webhook_name".into(),
328            Value::String(request.webhook_name.clone()),
329        );
330        body.insert(
331            "allow_concurrency".into(),
332            Value::Bool(request.allow_concurrency),
333        );
334        body.insert(
335            "retry_on_failure".into(),
336            Value::Bool(request.retry_on_failure),
337        );
338        if let Some(rec) = &request.recurrence {
339            body.insert("recurrence".into(), serde_json::to_value(rec)?);
340        }
341        if let Some(d) = &request.scheduled_date {
342            body.insert("scheduled_date".into(), Value::String(d.clone()));
343        }
344        if let Some(n) = request.max_retries {
345            body.insert("max_retries".into(), Value::Number(n.into()));
346        }
347        if let Some(n) = &request.notes {
348            body.insert("notes".into(), Value::String(n.clone()));
349        }
350        let payload = serde_json::to_vec(&Value::Object(body))?;
351        self.schedule_request_json::<Schedule>(Method::POST, path, &[], Some(payload))
352            .await
353    }
354
355    async fn list_schedules_inner(
356        &self,
357        path: &str,
358        opts: Option<&ListSchedulesOptions>,
359    ) -> Result<Vec<Schedule>, ScrapflyError> {
360        let mut query: Vec<(String, String)> = Vec::new();
361        if let Some(o) = opts {
362            if let Some(s) = &o.status {
363                query.push(("status".into(), s.clone()));
364            }
365            if let Some(k) = &o.kind {
366                query.push(("kind".into(), k.clone()));
367            }
368        }
369        self.schedule_request_json::<Vec<Schedule>>(Method::GET, path, &query, None)
370            .await
371    }
372
373    async fn schedule_request_json<T: for<'de> serde::Deserialize<'de>>(
374        &self,
375        method: Method,
376        path: &str,
377        query: &[(String, String)],
378        body: Option<Vec<u8>>,
379    ) -> Result<T, ScrapflyError> {
380        let (status, body_bytes) = self.schedule_send(method, path, query, body).await?;
381        if status == 204 {
382            return Err(ScrapflyError::Config(
383                "schedule endpoint returned 204 but a JSON body was expected".into(),
384            ));
385        }
386        if status >= 400 {
387            return Err(from_response(status, &body_bytes, 0, false));
388        }
389        Ok(serde_json::from_slice(&body_bytes)?)
390    }
391
392    async fn schedule_request_empty(
393        &self,
394        method: Method,
395        path: &str,
396        query: &[(String, String)],
397        body: Option<Vec<u8>>,
398    ) -> Result<(), ScrapflyError> {
399        let (status, body_bytes) = self.schedule_send(method, path, query, body).await?;
400        if status >= 400 {
401            return Err(from_response(status, &body_bytes, 0, false));
402        }
403        Ok(())
404    }
405
406    async fn schedule_send(
407        &self,
408        method: Method,
409        path: &str,
410        query: &[(String, String)],
411        body: Option<Vec<u8>>,
412    ) -> Result<(u16, bytes::Bytes), ScrapflyError> {
413        let url = self.build_url_public(path, query)?;
414        let mut headers = reqwest::header::HeaderMap::new();
415        headers.insert(
416            reqwest::header::ACCEPT,
417            reqwest::header::HeaderValue::from_static("application/json"),
418        );
419        if body.is_some() {
420            headers.insert(
421                reqwest::header::CONTENT_TYPE,
422                reqwest::header::HeaderValue::from_static("application/json"),
423            );
424        }
425        let resp = self
426            .send_simple_public(method, url, Some(headers), body)
427            .await?;
428        let status = resp.status().as_u16();
429        let bytes = resp.bytes().await.map_err(ScrapflyError::Transport)?;
430        Ok((status, bytes))
431    }
432}
433
434fn map_to_object(map: HashMap<String, Value>) -> serde_json::Map<String, Value> {
435    map.into_iter().collect()
436}
437
438// url_path_escape is a minimal percent-encoder for path segments. We escape
439// the chars that would corrupt a URL (`/?#`, whitespace, control chars).
440// Server-issued IDs are UUIDs in practice; this exists for defense.
441fn url_path_escape(s: &str) -> String {
442    let mut out = String::with_capacity(s.len());
443    for b in s.bytes() {
444        match b {
445            b'A'..=b'Z' | b'a'..=b'z' | b'0'..=b'9' | b'-' | b'.' | b'_' | b'~' => {
446                out.push(b as char);
447            }
448            _ => out.push_str(&format!("%{:02X}", b)),
449        }
450    }
451    out
452}