1use std::{borrow::Cow, collections::BTreeMap, fmt};
2
3#[cfg(feature = "socket")]
4use futures_util::{FutureExt, future::BoxFuture};
5#[cfg(feature = "socket")]
6use rust_socketio::{
7 Payload,
8 asynchronous::{Client as SocketIoClient, ClientBuilder as SocketIoClientBuilder},
9};
10use serde::Serialize;
11#[cfg(feature = "socket")]
12use serde::de::DeserializeOwned;
13#[cfg(feature = "socket")]
14use serde_json::Value;
15#[cfg(feature = "socket")]
16use tokio::sync::mpsc;
17
18#[cfg(feature = "socket")]
19use crate::{
20 Error, Result,
21 jobs::{Job, Task},
22};
23
24#[derive(Clone, Debug, Eq, PartialEq)]
25#[non_exhaustive]
26pub enum SocketChannel {
27 Job { job_id: String },
28 JobTasks { job_id: String },
29 Task { task_id: String },
30 UserJobs { user_id: String },
31 UserTasks { user_id: String },
32 Custom(String),
33}
34
35impl SocketChannel {
36 pub fn custom(channel: impl Into<String>) -> Self {
37 Self::Custom(channel.into())
38 }
39
40 pub fn job(job_id: impl Into<String>) -> Self {
41 Self::Job {
42 job_id: job_id.into(),
43 }
44 }
45
46 pub fn job_tasks(job_id: impl Into<String>) -> Self {
47 Self::JobTasks {
48 job_id: job_id.into(),
49 }
50 }
51
52 pub fn task(task_id: impl Into<String>) -> Self {
53 Self::Task {
54 task_id: task_id.into(),
55 }
56 }
57
58 pub fn user_jobs(user_id: impl Into<String>) -> Self {
59 Self::UserJobs {
60 user_id: user_id.into(),
61 }
62 }
63
64 pub fn user_tasks(user_id: impl Into<String>) -> Self {
65 Self::UserTasks {
66 user_id: user_id.into(),
67 }
68 }
69
70 pub fn name(&self) -> Cow<'_, str> {
71 match self {
72 Self::Job { job_id } => Cow::Owned(format!("private-job.{job_id}")),
73 Self::JobTasks { job_id } => Cow::Owned(format!("private-job.{job_id}.tasks")),
74 Self::Task { task_id } => Cow::Owned(format!("private-task.{task_id}")),
75 Self::UserJobs { user_id } => Cow::Owned(format!("private-user.{user_id}.jobs")),
76 Self::UserTasks { user_id } => Cow::Owned(format!("private-user.{user_id}.tasks")),
77 Self::Custom(channel) => Cow::Borrowed(channel.as_str()),
78 }
79 }
80
81 pub fn is_job(&self) -> bool {
82 matches!(self, Self::Job { .. })
83 }
84
85 pub fn is_job_tasks(&self) -> bool {
86 matches!(self, Self::JobTasks { .. })
87 }
88
89 pub fn is_task(&self) -> bool {
90 matches!(self, Self::Task { .. })
91 }
92
93 pub fn is_user_jobs(&self) -> bool {
94 matches!(self, Self::UserJobs { .. })
95 }
96
97 pub fn is_user_tasks(&self) -> bool {
98 matches!(self, Self::UserTasks { .. })
99 }
100
101 pub fn job_id(&self) -> Option<&str> {
102 match self {
103 Self::Job { job_id } | Self::JobTasks { job_id } => Some(job_id),
104 _ => None,
105 }
106 }
107
108 pub fn task_id(&self) -> Option<&str> {
109 match self {
110 Self::Task { task_id } => Some(task_id),
111 _ => None,
112 }
113 }
114
115 pub fn user_id(&self) -> Option<&str> {
116 match self {
117 Self::UserJobs { user_id } | Self::UserTasks { user_id } => Some(user_id),
118 _ => None,
119 }
120 }
121}
122
123#[derive(Clone, Copy, Debug, Eq, PartialEq)]
124#[non_exhaustive]
125pub enum JobSocketEvent {
126 Created,
127 Updated,
128 Finished,
129 Failed,
130}
131
132impl JobSocketEvent {
133 pub fn name(self) -> &'static str {
134 match self {
135 Self::Created => "job.created",
136 Self::Updated => "job.updated",
137 Self::Finished => "job.finished",
138 Self::Failed => "job.failed",
139 }
140 }
141
142 pub fn from_name(name: &str) -> Option<Self> {
143 Some(match name {
144 "job.created" => Self::Created,
145 "job.updated" => Self::Updated,
146 "job.finished" => Self::Finished,
147 "job.failed" => Self::Failed,
148 _ => return None,
149 })
150 }
151}
152
153#[derive(Clone, Copy, Debug, Eq, PartialEq)]
154#[non_exhaustive]
155pub enum TaskSocketEvent {
156 Created,
157 Updated,
158 Finished,
159 Failed,
160}
161
162impl TaskSocketEvent {
163 pub fn name(self) -> &'static str {
164 match self {
165 Self::Created => "task.created",
166 Self::Updated => "task.updated",
167 Self::Finished => "task.finished",
168 Self::Failed => "task.failed",
169 }
170 }
171
172 pub fn from_name(name: &str) -> Option<Self> {
173 Some(match name {
174 "task.created" => Self::Created,
175 "task.updated" => Self::Updated,
176 "task.finished" => Self::Finished,
177 "task.failed" => Self::Failed,
178 _ => return None,
179 })
180 }
181}
182
183#[derive(Clone, Debug, Eq, PartialEq)]
184#[non_exhaustive]
185pub enum SocketEventKind {
186 Job(JobSocketEvent),
187 Task(TaskSocketEvent),
188 Other(String),
189}
190
191impl SocketEventKind {
192 pub fn from_name(name: impl Into<String>) -> Self {
193 let name = name.into();
194 if let Some(event) = JobSocketEvent::from_name(&name) {
195 return Self::Job(event);
196 }
197 if let Some(event) = TaskSocketEvent::from_name(&name) {
198 return Self::Task(event);
199 }
200 Self::Other(name)
201 }
202
203 pub fn name(&self) -> &str {
204 match self {
205 Self::Job(event) => event.name(),
206 Self::Task(event) => event.name(),
207 Self::Other(event) => event.as_str(),
208 }
209 }
210
211 pub fn is_job(&self) -> bool {
212 matches!(self, Self::Job(_))
213 }
214
215 pub fn is_task(&self) -> bool {
216 matches!(self, Self::Task(_))
217 }
218}
219
220#[derive(Clone, Serialize)]
221pub struct SocketSubscription {
222 channel: String,
223 auth: SocketAuth,
224}
225
226impl SocketSubscription {
227 pub(crate) fn new(channel: impl Into<String>, api_key: &str) -> Self {
228 Self {
229 channel: channel.into(),
230 auth: SocketAuth::bearer(api_key),
231 }
232 }
233
234 pub fn channel(&self) -> &str {
235 self.channel.as_str()
236 }
237}
238
239impl fmt::Debug for SocketSubscription {
240 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
241 f.debug_struct("SocketSubscription")
242 .field("channel", &self.channel)
243 .field("auth", &"REDACTED")
244 .finish()
245 }
246}
247
248#[derive(Clone, Serialize)]
249pub struct SocketAuth {
250 headers: BTreeMap<String, String>,
251}
252
253impl SocketAuth {
254 fn bearer(api_key: &str) -> Self {
255 let mut headers = BTreeMap::new();
256 headers.insert("Authorization".to_string(), format!("Bearer {api_key}"));
257 Self { headers }
258 }
259}
260
261impl fmt::Debug for SocketAuth {
262 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
263 f.write_str("SocketAuth(REDACTED)")
264 }
265}
266
267pub fn socket_base_url(sandbox: bool) -> &'static str {
268 if sandbox {
269 "https://socketio.sandbox.cloudconvert.com"
270 } else {
271 "https://socketio.cloudconvert.com"
272 }
273}
274
275#[cfg(feature = "socket")]
276#[derive(Clone, Debug)]
277pub struct SocketEvent {
278 event: String,
279 channel: Option<String>,
280 data: Value,
281}
282
283#[cfg(feature = "socket")]
284impl SocketEvent {
285 pub fn event(&self) -> &str {
286 self.event.as_str()
287 }
288
289 pub fn channel(&self) -> Option<&str> {
290 self.channel.as_deref()
291 }
292
293 pub fn data(&self) -> &Value {
294 &self.data
295 }
296
297 pub fn kind(&self) -> SocketEventKind {
298 SocketEventKind::from_name(self.event.clone())
299 }
300
301 pub fn job_event(&self) -> Option<JobSocketEvent> {
302 JobSocketEvent::from_name(&self.event)
303 }
304
305 pub fn task_event(&self) -> Option<TaskSocketEvent> {
306 TaskSocketEvent::from_name(&self.event)
307 }
308
309 pub fn is_job_event(&self) -> bool {
310 self.event.starts_with("job.")
311 }
312
313 pub fn is_task_event(&self) -> bool {
314 self.event.starts_with("task.")
315 }
316
317 pub fn is_finished(&self) -> bool {
318 self.event.ends_with(".finished")
319 }
320
321 pub fn is_created(&self) -> bool {
322 self.event.ends_with(".created")
323 }
324
325 pub fn is_updated(&self) -> bool {
326 self.event.ends_with(".updated")
327 }
328
329 pub fn is_failed(&self) -> bool {
330 self.event.ends_with(".failed")
331 }
332
333 pub fn is_terminal(&self) -> bool {
334 self.is_finished() || self.is_failed()
335 }
336
337 pub fn job(&self) -> Result<Option<Job>> {
338 decode_socket_data_field(&self.data, "job")
339 }
340
341 pub fn task(&self) -> Result<Option<Task>> {
342 decode_socket_data_field(&self.data, "task")
343 }
344
345 #[allow(deprecated)]
346 fn from_payload(event: impl Into<String>, payload: Payload) -> Self {
347 let (channel, data) = match payload {
348 Payload::Text(values) => split_socket_payload_values(values),
349 Payload::String(value) => {
350 let data = serde_json::from_str(&value).unwrap_or(Value::String(value));
351 (None, data)
352 }
353 Payload::Binary(bytes) => (
354 None,
355 Value::Array(
356 bytes
357 .into_iter()
358 .map(|byte| Value::Number(byte.into()))
359 .collect(),
360 ),
361 ),
362 };
363
364 Self {
365 event: event.into(),
366 channel,
367 data,
368 }
369 }
370}
371
372#[cfg(feature = "socket")]
373pub struct CloudConvertSocket {
374 client: SocketIoClient,
375 receiver: mpsc::Receiver<SocketEvent>,
376}
377
378#[cfg(feature = "socket")]
379impl fmt::Debug for CloudConvertSocket {
380 fn fmt(&self, formatter: &mut fmt::Formatter<'_>) -> fmt::Result {
381 formatter
382 .debug_struct("CloudConvertSocket")
383 .field("client", &"Socket.IO client")
384 .field("receiver", &"event receiver")
385 .finish()
386 }
387}
388
389#[cfg(feature = "socket")]
390impl CloudConvertSocket {
391 pub async fn connect(
392 base_url: impl Into<String>,
393 subscriptions: impl IntoIterator<Item = SocketSubscription>,
394 ) -> Result<Self> {
395 Self::connect_with_buffer(base_url, subscriptions, 64).await
396 }
397
398 pub async fn connect_with_buffer(
399 base_url: impl Into<String>,
400 subscriptions: impl IntoIterator<Item = SocketSubscription>,
401 buffer: usize,
402 ) -> Result<Self> {
403 let (sender, receiver) = mpsc::channel(buffer.max(1));
404 let client = socket_client_builder(base_url.into(), sender)
405 .connect()
406 .await
407 .map_err(socket_error)?;
408 let socket = Self { client, receiver };
409
410 for subscription in subscriptions {
411 socket.subscribe(subscription).await?;
412 }
413
414 Ok(socket)
415 }
416
417 pub async fn subscribe(&self, subscription: SocketSubscription) -> Result<()> {
418 let payload = serde_json::to_value(subscription)?;
419 self.client
420 .emit("subscribe", payload)
421 .await
422 .map_err(socket_error)
423 }
424
425 pub async fn next_event(&mut self) -> Option<SocketEvent> {
426 self.receiver.recv().await
427 }
428
429 pub async fn disconnect(&self) -> Result<()> {
430 self.client.disconnect().await.map_err(socket_error)
431 }
432}
433
434#[cfg(feature = "socket")]
435fn socket_client_builder(
436 base_url: String,
437 sender: mpsc::Sender<SocketEvent>,
438) -> SocketIoClientBuilder {
439 let mut builder = SocketIoClientBuilder::new(base_url)
440 .reconnect(true)
441 .reconnect_on_disconnect(true);
442
443 for event in [
444 JobSocketEvent::Created.name(),
445 JobSocketEvent::Updated.name(),
446 JobSocketEvent::Finished.name(),
447 JobSocketEvent::Failed.name(),
448 TaskSocketEvent::Created.name(),
449 TaskSocketEvent::Updated.name(),
450 TaskSocketEvent::Finished.name(),
451 TaskSocketEvent::Failed.name(),
452 ] {
453 builder = builder.on(event, socket_event_callback(event, sender.clone()));
454 }
455
456 builder
457}
458
459#[cfg(feature = "socket")]
460fn socket_event_callback(
461 event: &'static str,
462 sender: mpsc::Sender<SocketEvent>,
463) -> impl FnMut(Payload, SocketIoClient) -> BoxFuture<'static, ()> + Send + Sync + 'static {
464 move |payload, _client| {
465 let sender = sender.clone();
466 async move {
467 let _ = sender.send(SocketEvent::from_payload(event, payload)).await;
468 }
469 .boxed()
470 }
471}
472
473#[cfg(feature = "socket")]
474fn split_socket_payload_values(mut values: Vec<Value>) -> (Option<String>, Value) {
475 match values.as_mut_slice() {
476 [Value::String(channel), data] => (Some(channel.clone()), data.clone()),
477 [_] => (None, values.remove(0)),
478 _ => (None, Value::Array(values)),
479 }
480}
481
482#[cfg(feature = "socket")]
483fn decode_socket_data_field<T>(data: &Value, field: &'static str) -> Result<Option<T>>
484where
485 T: DeserializeOwned,
486{
487 let Some(value) = data.get(field) else {
488 return Ok(None);
489 };
490
491 serde_json::from_value(value.clone())
492 .map(Some)
493 .map_err(Error::Json)
494}
495
496#[cfg(feature = "socket")]
497fn socket_error(error: impl fmt::Display) -> Error {
498 Error::Socket(error.to_string())
499}
500
501#[cfg(all(test, feature = "socket"))]
502mod managed_socket_tests {
503 use super::*;
504 use std::time::Duration;
505
506 use serde_json::json;
507
508 #[test]
509 fn socket_event_decodes_channel_and_job_payload() {
510 let event = SocketEvent::from_payload(
511 "job.finished",
512 Payload::Text(vec![
513 json!("private-job.job_1"),
514 json!({
515 "job": {
516 "id": "job_1",
517 "status": "finished",
518 "tasks": []
519 }
520 }),
521 ]),
522 );
523
524 assert_eq!(event.event(), "job.finished");
525 assert_eq!(event.channel(), Some("private-job.job_1"));
526 assert_eq!(event.kind(), SocketEventKind::Job(JobSocketEvent::Finished));
527 assert_eq!(event.job_event(), Some(JobSocketEvent::Finished));
528 assert!(event.is_job_event());
529 assert!(event.is_finished());
530 assert!(event.is_terminal());
531 assert_eq!(event.job().unwrap().unwrap().id, "job_1");
532 }
533
534 #[test]
535 fn socket_event_preserves_unknown_payload_shapes() {
536 let event = SocketEvent::from_payload(
537 "task.updated",
538 Payload::Text(vec![json!({"unexpected": true})]),
539 );
540
541 assert_eq!(event.channel(), None);
542 assert_eq!(event.data()["unexpected"], true);
543 assert_eq!(event.task_event(), Some(TaskSocketEvent::Updated));
544 assert!(event.is_updated());
545 assert!(event.task().unwrap().is_none());
546 }
547
548 #[test]
549 fn socket_event_decodes_task_payload_and_status_helpers() {
550 let event = SocketEvent::from_payload(
551 "task.failed",
552 Payload::from(
553 json!({
554 "task": {
555 "id": "task_1",
556 "job_id": "job_1",
557 "operation": "convert",
558 "status": "error"
559 }
560 })
561 .to_string(),
562 ),
563 );
564
565 assert_eq!(event.event(), "task.failed");
566 assert_eq!(event.channel(), None);
567 assert_eq!(event.kind(), SocketEventKind::Task(TaskSocketEvent::Failed));
568 assert_eq!(event.task_event(), Some(TaskSocketEvent::Failed));
569 assert_eq!(event.job_event(), None);
570 assert!(event.is_task_event());
571 assert!(!event.is_job_event());
572 assert!(event.is_failed());
573 assert!(event.is_terminal());
574 assert_eq!(event.task().unwrap().unwrap().id, "task_1");
575 assert!(event.job().unwrap().is_none());
576 }
577
578 #[test]
579 fn socket_event_handles_created_updated_binary_and_unknown_payloads() {
580 let created = SocketEvent::from_payload(
581 "task.created",
582 Payload::Text(vec![
583 json!("private-task.task_1"),
584 json!({
585 "task": {
586 "id": "task_1",
587 "job_id": "job_1",
588 "operation": "convert",
589 "status": "waiting"
590 }
591 }),
592 ]),
593 );
594 assert_eq!(created.channel(), Some("private-task.task_1"));
595 assert!(created.is_created());
596 assert!(!created.is_terminal());
597
598 let binary = SocketEvent::from_payload(
599 "job.updated",
600 Payload::Binary(bytes::Bytes::from_static(&[1, 2, 3])),
601 );
602 assert_eq!(binary.data(), &json!([1, 2, 3]));
603 assert!(binary.is_updated());
604
605 let string = SocketEvent::from_payload("job.created", Payload::from("raw".to_string()));
606 assert_eq!(string.data(), &json!("raw"));
607 assert!(string.is_created());
608
609 let unknown = SocketEvent::from_payload(
610 "custom.event",
611 Payload::Text(vec![json!("one"), json!("two"), json!("three")]),
612 );
613 assert_eq!(
614 unknown.kind(),
615 SocketEventKind::Other("custom.event".to_string())
616 );
617 assert_eq!(unknown.kind().name(), "custom.event");
618 assert!(!unknown.kind().is_job());
619 assert!(!unknown.kind().is_task());
620 assert!(!unknown.is_job_event());
621 assert!(!unknown.is_task_event());
622 assert_eq!(unknown.data(), &json!(["one", "two", "three"]));
623 }
624
625 #[test]
626 fn socket_event_reports_json_decode_errors_for_bad_payload_fields() {
627 let event = SocketEvent::from_payload(
628 "job.finished",
629 Payload::Text(vec![json!({
630 "job": "not a job object"
631 })]),
632 );
633
634 assert!(matches!(event.job().unwrap_err(), Error::Json(_)));
635 }
636
637 #[tokio::test]
638 async fn socket_connect_reports_socket_errors_for_unavailable_local_endpoint() {
639 let result = tokio::time::timeout(
640 Duration::from_secs(2),
641 CloudConvertSocket::connect("http://127.0.0.1:1", Vec::<SocketSubscription>::new()),
642 )
643 .await
644 .expect("local refused socket connection should finish quickly");
645
646 assert!(matches!(result, Err(Error::Socket(_))));
647 }
648}