Skip to main content

cloudconvert_sdk/
socket.rs

1use std::{borrow::Cow, collections::BTreeMap, fmt};
2
3#[cfg(feature = "socket")]
4use futures_util::{FutureExt, future::BoxFuture};
5#[cfg(feature = "socket")]
6use rust_socketio::{
7    Payload,
8    asynchronous::{Client as SocketIoClient, ClientBuilder as SocketIoClientBuilder},
9};
10use serde::Serialize;
11#[cfg(feature = "socket")]
12use serde::de::DeserializeOwned;
13#[cfg(feature = "socket")]
14use serde_json::Value;
15#[cfg(feature = "socket")]
16use tokio::sync::mpsc;
17
18#[cfg(feature = "socket")]
19use crate::{
20    Error, Result,
21    jobs::{Job, Task},
22};
23
24#[derive(Clone, Debug, Eq, PartialEq)]
25#[non_exhaustive]
26pub enum SocketChannel {
27    Job { job_id: String },
28    JobTasks { job_id: String },
29    Task { task_id: String },
30    UserJobs { user_id: String },
31    UserTasks { user_id: String },
32    Custom(String),
33}
34
35impl SocketChannel {
36    pub fn custom(channel: impl Into<String>) -> Self {
37        Self::Custom(channel.into())
38    }
39
40    pub fn job(job_id: impl Into<String>) -> Self {
41        Self::Job {
42            job_id: job_id.into(),
43        }
44    }
45
46    pub fn job_tasks(job_id: impl Into<String>) -> Self {
47        Self::JobTasks {
48            job_id: job_id.into(),
49        }
50    }
51
52    pub fn task(task_id: impl Into<String>) -> Self {
53        Self::Task {
54            task_id: task_id.into(),
55        }
56    }
57
58    pub fn user_jobs(user_id: impl Into<String>) -> Self {
59        Self::UserJobs {
60            user_id: user_id.into(),
61        }
62    }
63
64    pub fn user_tasks(user_id: impl Into<String>) -> Self {
65        Self::UserTasks {
66            user_id: user_id.into(),
67        }
68    }
69
70    pub fn name(&self) -> Cow<'_, str> {
71        match self {
72            Self::Job { job_id } => Cow::Owned(format!("private-job.{job_id}")),
73            Self::JobTasks { job_id } => Cow::Owned(format!("private-job.{job_id}.tasks")),
74            Self::Task { task_id } => Cow::Owned(format!("private-task.{task_id}")),
75            Self::UserJobs { user_id } => Cow::Owned(format!("private-user.{user_id}.jobs")),
76            Self::UserTasks { user_id } => Cow::Owned(format!("private-user.{user_id}.tasks")),
77            Self::Custom(channel) => Cow::Borrowed(channel.as_str()),
78        }
79    }
80
81    pub fn is_job(&self) -> bool {
82        matches!(self, Self::Job { .. })
83    }
84
85    pub fn is_job_tasks(&self) -> bool {
86        matches!(self, Self::JobTasks { .. })
87    }
88
89    pub fn is_task(&self) -> bool {
90        matches!(self, Self::Task { .. })
91    }
92
93    pub fn is_user_jobs(&self) -> bool {
94        matches!(self, Self::UserJobs { .. })
95    }
96
97    pub fn is_user_tasks(&self) -> bool {
98        matches!(self, Self::UserTasks { .. })
99    }
100
101    pub fn job_id(&self) -> Option<&str> {
102        match self {
103            Self::Job { job_id } | Self::JobTasks { job_id } => Some(job_id),
104            _ => None,
105        }
106    }
107
108    pub fn task_id(&self) -> Option<&str> {
109        match self {
110            Self::Task { task_id } => Some(task_id),
111            _ => None,
112        }
113    }
114
115    pub fn user_id(&self) -> Option<&str> {
116        match self {
117            Self::UserJobs { user_id } | Self::UserTasks { user_id } => Some(user_id),
118            _ => None,
119        }
120    }
121}
122
123#[derive(Clone, Copy, Debug, Eq, PartialEq)]
124#[non_exhaustive]
125pub enum JobSocketEvent {
126    Created,
127    Updated,
128    Finished,
129    Failed,
130}
131
132impl JobSocketEvent {
133    pub fn name(self) -> &'static str {
134        match self {
135            Self::Created => "job.created",
136            Self::Updated => "job.updated",
137            Self::Finished => "job.finished",
138            Self::Failed => "job.failed",
139        }
140    }
141
142    pub fn from_name(name: &str) -> Option<Self> {
143        Some(match name {
144            "job.created" => Self::Created,
145            "job.updated" => Self::Updated,
146            "job.finished" => Self::Finished,
147            "job.failed" => Self::Failed,
148            _ => return None,
149        })
150    }
151}
152
153#[derive(Clone, Copy, Debug, Eq, PartialEq)]
154#[non_exhaustive]
155pub enum TaskSocketEvent {
156    Created,
157    Updated,
158    Finished,
159    Failed,
160}
161
162impl TaskSocketEvent {
163    pub fn name(self) -> &'static str {
164        match self {
165            Self::Created => "task.created",
166            Self::Updated => "task.updated",
167            Self::Finished => "task.finished",
168            Self::Failed => "task.failed",
169        }
170    }
171
172    pub fn from_name(name: &str) -> Option<Self> {
173        Some(match name {
174            "task.created" => Self::Created,
175            "task.updated" => Self::Updated,
176            "task.finished" => Self::Finished,
177            "task.failed" => Self::Failed,
178            _ => return None,
179        })
180    }
181}
182
183#[derive(Clone, Debug, Eq, PartialEq)]
184#[non_exhaustive]
185pub enum SocketEventKind {
186    Job(JobSocketEvent),
187    Task(TaskSocketEvent),
188    Other(String),
189}
190
191impl SocketEventKind {
192    pub fn from_name(name: impl Into<String>) -> Self {
193        let name = name.into();
194        if let Some(event) = JobSocketEvent::from_name(&name) {
195            return Self::Job(event);
196        }
197        if let Some(event) = TaskSocketEvent::from_name(&name) {
198            return Self::Task(event);
199        }
200        Self::Other(name)
201    }
202
203    pub fn name(&self) -> &str {
204        match self {
205            Self::Job(event) => event.name(),
206            Self::Task(event) => event.name(),
207            Self::Other(event) => event.as_str(),
208        }
209    }
210
211    pub fn is_job(&self) -> bool {
212        matches!(self, Self::Job(_))
213    }
214
215    pub fn is_task(&self) -> bool {
216        matches!(self, Self::Task(_))
217    }
218}
219
220#[derive(Clone, Serialize)]
221pub struct SocketSubscription {
222    channel: String,
223    auth: SocketAuth,
224}
225
226impl SocketSubscription {
227    pub(crate) fn new(channel: impl Into<String>, api_key: &str) -> Self {
228        Self {
229            channel: channel.into(),
230            auth: SocketAuth::bearer(api_key),
231        }
232    }
233
234    pub fn channel(&self) -> &str {
235        self.channel.as_str()
236    }
237}
238
239impl fmt::Debug for SocketSubscription {
240    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
241        f.debug_struct("SocketSubscription")
242            .field("channel", &self.channel)
243            .field("auth", &"REDACTED")
244            .finish()
245    }
246}
247
248#[derive(Clone, Serialize)]
249pub struct SocketAuth {
250    headers: BTreeMap<String, String>,
251}
252
253impl SocketAuth {
254    fn bearer(api_key: &str) -> Self {
255        let mut headers = BTreeMap::new();
256        headers.insert("Authorization".to_string(), format!("Bearer {api_key}"));
257        Self { headers }
258    }
259}
260
261impl fmt::Debug for SocketAuth {
262    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
263        f.write_str("SocketAuth(REDACTED)")
264    }
265}
266
267pub fn socket_base_url(sandbox: bool) -> &'static str {
268    if sandbox {
269        "https://socketio.sandbox.cloudconvert.com"
270    } else {
271        "https://socketio.cloudconvert.com"
272    }
273}
274
275#[cfg(feature = "socket")]
276#[derive(Clone, Debug)]
277pub struct SocketEvent {
278    event: String,
279    channel: Option<String>,
280    data: Value,
281}
282
283#[cfg(feature = "socket")]
284impl SocketEvent {
285    pub fn event(&self) -> &str {
286        self.event.as_str()
287    }
288
289    pub fn channel(&self) -> Option<&str> {
290        self.channel.as_deref()
291    }
292
293    pub fn data(&self) -> &Value {
294        &self.data
295    }
296
297    pub fn kind(&self) -> SocketEventKind {
298        SocketEventKind::from_name(self.event.clone())
299    }
300
301    pub fn job_event(&self) -> Option<JobSocketEvent> {
302        JobSocketEvent::from_name(&self.event)
303    }
304
305    pub fn task_event(&self) -> Option<TaskSocketEvent> {
306        TaskSocketEvent::from_name(&self.event)
307    }
308
309    pub fn is_job_event(&self) -> bool {
310        self.event.starts_with("job.")
311    }
312
313    pub fn is_task_event(&self) -> bool {
314        self.event.starts_with("task.")
315    }
316
317    pub fn is_finished(&self) -> bool {
318        self.event.ends_with(".finished")
319    }
320
321    pub fn is_created(&self) -> bool {
322        self.event.ends_with(".created")
323    }
324
325    pub fn is_updated(&self) -> bool {
326        self.event.ends_with(".updated")
327    }
328
329    pub fn is_failed(&self) -> bool {
330        self.event.ends_with(".failed")
331    }
332
333    pub fn is_terminal(&self) -> bool {
334        self.is_finished() || self.is_failed()
335    }
336
337    pub fn job(&self) -> Result<Option<Job>> {
338        decode_socket_data_field(&self.data, "job")
339    }
340
341    pub fn task(&self) -> Result<Option<Task>> {
342        decode_socket_data_field(&self.data, "task")
343    }
344
345    #[allow(deprecated)]
346    fn from_payload(event: impl Into<String>, payload: Payload) -> Self {
347        let (channel, data) = match payload {
348            Payload::Text(values) => split_socket_payload_values(values),
349            Payload::String(value) => {
350                let data = serde_json::from_str(&value).unwrap_or(Value::String(value));
351                (None, data)
352            }
353            Payload::Binary(bytes) => (
354                None,
355                Value::Array(
356                    bytes
357                        .into_iter()
358                        .map(|byte| Value::Number(byte.into()))
359                        .collect(),
360                ),
361            ),
362        };
363
364        Self {
365            event: event.into(),
366            channel,
367            data,
368        }
369    }
370}
371
372#[cfg(feature = "socket")]
373pub struct CloudConvertSocket {
374    client: SocketIoClient,
375    receiver: mpsc::Receiver<SocketEvent>,
376}
377
378#[cfg(feature = "socket")]
379impl fmt::Debug for CloudConvertSocket {
380    fn fmt(&self, formatter: &mut fmt::Formatter<'_>) -> fmt::Result {
381        formatter
382            .debug_struct("CloudConvertSocket")
383            .field("client", &"Socket.IO client")
384            .field("receiver", &"event receiver")
385            .finish()
386    }
387}
388
389#[cfg(feature = "socket")]
390impl CloudConvertSocket {
391    pub async fn connect(
392        base_url: impl Into<String>,
393        subscriptions: impl IntoIterator<Item = SocketSubscription>,
394    ) -> Result<Self> {
395        Self::connect_with_buffer(base_url, subscriptions, 64).await
396    }
397
398    pub async fn connect_with_buffer(
399        base_url: impl Into<String>,
400        subscriptions: impl IntoIterator<Item = SocketSubscription>,
401        buffer: usize,
402    ) -> Result<Self> {
403        let (sender, receiver) = mpsc::channel(buffer.max(1));
404        let client = socket_client_builder(base_url.into(), sender)
405            .connect()
406            .await
407            .map_err(socket_error)?;
408        let socket = Self { client, receiver };
409
410        for subscription in subscriptions {
411            socket.subscribe(subscription).await?;
412        }
413
414        Ok(socket)
415    }
416
417    pub async fn subscribe(&self, subscription: SocketSubscription) -> Result<()> {
418        let payload = serde_json::to_value(subscription)?;
419        self.client
420            .emit("subscribe", payload)
421            .await
422            .map_err(socket_error)
423    }
424
425    pub async fn next_event(&mut self) -> Option<SocketEvent> {
426        self.receiver.recv().await
427    }
428
429    pub async fn disconnect(&self) -> Result<()> {
430        self.client.disconnect().await.map_err(socket_error)
431    }
432}
433
434#[cfg(feature = "socket")]
435fn socket_client_builder(
436    base_url: String,
437    sender: mpsc::Sender<SocketEvent>,
438) -> SocketIoClientBuilder {
439    let mut builder = SocketIoClientBuilder::new(base_url)
440        .reconnect(true)
441        .reconnect_on_disconnect(true);
442
443    for event in [
444        JobSocketEvent::Created.name(),
445        JobSocketEvent::Updated.name(),
446        JobSocketEvent::Finished.name(),
447        JobSocketEvent::Failed.name(),
448        TaskSocketEvent::Created.name(),
449        TaskSocketEvent::Updated.name(),
450        TaskSocketEvent::Finished.name(),
451        TaskSocketEvent::Failed.name(),
452    ] {
453        builder = builder.on(event, socket_event_callback(event, sender.clone()));
454    }
455
456    builder
457}
458
459#[cfg(feature = "socket")]
460fn socket_event_callback(
461    event: &'static str,
462    sender: mpsc::Sender<SocketEvent>,
463) -> impl FnMut(Payload, SocketIoClient) -> BoxFuture<'static, ()> + Send + Sync + 'static {
464    move |payload, _client| {
465        let sender = sender.clone();
466        async move {
467            let _ = sender.send(SocketEvent::from_payload(event, payload)).await;
468        }
469        .boxed()
470    }
471}
472
473#[cfg(feature = "socket")]
474fn split_socket_payload_values(mut values: Vec<Value>) -> (Option<String>, Value) {
475    match values.as_mut_slice() {
476        [Value::String(channel), data] => (Some(channel.clone()), data.clone()),
477        [_] => (None, values.remove(0)),
478        _ => (None, Value::Array(values)),
479    }
480}
481
482#[cfg(feature = "socket")]
483fn decode_socket_data_field<T>(data: &Value, field: &'static str) -> Result<Option<T>>
484where
485    T: DeserializeOwned,
486{
487    let Some(value) = data.get(field) else {
488        return Ok(None);
489    };
490
491    serde_json::from_value(value.clone())
492        .map(Some)
493        .map_err(Error::Json)
494}
495
496#[cfg(feature = "socket")]
497fn socket_error(error: impl fmt::Display) -> Error {
498    Error::Socket(error.to_string())
499}
500
501#[cfg(all(test, feature = "socket"))]
502mod managed_socket_tests {
503    use super::*;
504    use std::time::Duration;
505
506    use serde_json::json;
507
508    #[test]
509    fn socket_event_decodes_channel_and_job_payload() {
510        let event = SocketEvent::from_payload(
511            "job.finished",
512            Payload::Text(vec![
513                json!("private-job.job_1"),
514                json!({
515                    "job": {
516                        "id": "job_1",
517                        "status": "finished",
518                        "tasks": []
519                    }
520                }),
521            ]),
522        );
523
524        assert_eq!(event.event(), "job.finished");
525        assert_eq!(event.channel(), Some("private-job.job_1"));
526        assert_eq!(event.kind(), SocketEventKind::Job(JobSocketEvent::Finished));
527        assert_eq!(event.job_event(), Some(JobSocketEvent::Finished));
528        assert!(event.is_job_event());
529        assert!(event.is_finished());
530        assert!(event.is_terminal());
531        assert_eq!(event.job().unwrap().unwrap().id, "job_1");
532    }
533
534    #[test]
535    fn socket_event_preserves_unknown_payload_shapes() {
536        let event = SocketEvent::from_payload(
537            "task.updated",
538            Payload::Text(vec![json!({"unexpected": true})]),
539        );
540
541        assert_eq!(event.channel(), None);
542        assert_eq!(event.data()["unexpected"], true);
543        assert_eq!(event.task_event(), Some(TaskSocketEvent::Updated));
544        assert!(event.is_updated());
545        assert!(event.task().unwrap().is_none());
546    }
547
548    #[test]
549    fn socket_event_decodes_task_payload_and_status_helpers() {
550        let event = SocketEvent::from_payload(
551            "task.failed",
552            Payload::from(
553                json!({
554                    "task": {
555                        "id": "task_1",
556                        "job_id": "job_1",
557                        "operation": "convert",
558                        "status": "error"
559                    }
560                })
561                .to_string(),
562            ),
563        );
564
565        assert_eq!(event.event(), "task.failed");
566        assert_eq!(event.channel(), None);
567        assert_eq!(event.kind(), SocketEventKind::Task(TaskSocketEvent::Failed));
568        assert_eq!(event.task_event(), Some(TaskSocketEvent::Failed));
569        assert_eq!(event.job_event(), None);
570        assert!(event.is_task_event());
571        assert!(!event.is_job_event());
572        assert!(event.is_failed());
573        assert!(event.is_terminal());
574        assert_eq!(event.task().unwrap().unwrap().id, "task_1");
575        assert!(event.job().unwrap().is_none());
576    }
577
578    #[test]
579    fn socket_event_handles_created_updated_binary_and_unknown_payloads() {
580        let created = SocketEvent::from_payload(
581            "task.created",
582            Payload::Text(vec![
583                json!("private-task.task_1"),
584                json!({
585                    "task": {
586                        "id": "task_1",
587                        "job_id": "job_1",
588                        "operation": "convert",
589                        "status": "waiting"
590                    }
591                }),
592            ]),
593        );
594        assert_eq!(created.channel(), Some("private-task.task_1"));
595        assert!(created.is_created());
596        assert!(!created.is_terminal());
597
598        let binary = SocketEvent::from_payload(
599            "job.updated",
600            Payload::Binary(bytes::Bytes::from_static(&[1, 2, 3])),
601        );
602        assert_eq!(binary.data(), &json!([1, 2, 3]));
603        assert!(binary.is_updated());
604
605        let string = SocketEvent::from_payload("job.created", Payload::from("raw".to_string()));
606        assert_eq!(string.data(), &json!("raw"));
607        assert!(string.is_created());
608
609        let unknown = SocketEvent::from_payload(
610            "custom.event",
611            Payload::Text(vec![json!("one"), json!("two"), json!("three")]),
612        );
613        assert_eq!(
614            unknown.kind(),
615            SocketEventKind::Other("custom.event".to_string())
616        );
617        assert_eq!(unknown.kind().name(), "custom.event");
618        assert!(!unknown.kind().is_job());
619        assert!(!unknown.kind().is_task());
620        assert!(!unknown.is_job_event());
621        assert!(!unknown.is_task_event());
622        assert_eq!(unknown.data(), &json!(["one", "two", "three"]));
623    }
624
625    #[test]
626    fn socket_event_reports_json_decode_errors_for_bad_payload_fields() {
627        let event = SocketEvent::from_payload(
628            "job.finished",
629            Payload::Text(vec![json!({
630                "job": "not a job object"
631            })]),
632        );
633
634        assert!(matches!(event.job().unwrap_err(), Error::Json(_)));
635    }
636
637    #[tokio::test]
638    async fn socket_connect_reports_socket_errors_for_unavailable_local_endpoint() {
639        let result = tokio::time::timeout(
640            Duration::from_secs(2),
641            CloudConvertSocket::connect("http://127.0.0.1:1", Vec::<SocketSubscription>::new()),
642        )
643        .await
644        .expect("local refused socket connection should finish quickly");
645
646        assert!(matches!(result, Err(Error::Socket(_))));
647    }
648}