1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
mod description;
mod documentation;

pub use description::McaiWorkerDescription;
pub use documentation::McaiWorkerDocumentation;

use crate::prelude::*;
use schemars::{schema::RootSchema, schema_for, JsonSchema};
use serde::de::DeserializeOwned;

// #[cfg(feature = "media")]
// use crate::{FormatContext, ProcessFrame, ProcessResult, StreamDescriptor};
#[cfg(feature = "media")]
use crate::media::{DESTINATION_PATH_PARAMETER, SOURCE_PATH_PARAMETER};
#[cfg(feature = "media")]
use schemars::schema::{InstanceType::Object, SingleOrVec::Single};
#[cfg(feature = "media")]
use std::{
  ops::Deref,
  sync::{mpsc::Sender, Arc, Mutex},
};

/// # Trait to describe a worker
/// Implement this trait to implement a worker
pub trait McaiWorker<P: DeserializeOwned + JsonSchema, D: McaiWorkerDescription> {
  fn get_mcai_worker_description(&self) -> Box<D> {
    Box::<D>::default()
  }

  fn get_parameters_schema(&self) -> Result<RootSchema> {
    let parameters: RootSchema = schema_for!(P);

    #[cfg(feature = "media")]
    if let Some(Single(object)) = &parameters.schema.instance_type {
      if matches!(object.deref(), Object) {
        if let Some(object_validation) = &parameters.schema.object {
          if !object_validation
            .properties
            .contains_key(SOURCE_PATH_PARAMETER)
          {
            return Err(MessageError::ParameterValueError(format!(
              "Expected media parameter missing: '{SOURCE_PATH_PARAMETER}'"
            )));
          }
          if !object_validation
            .properties
            .contains_key(DESTINATION_PATH_PARAMETER)
          {
            return Err(MessageError::ParameterValueError(format!(
              "Expected media parameter missing: '{DESTINATION_PATH_PARAMETER}'"
            )));
          }
        }
      }
    }

    Ok(parameters)
  }

  fn init(&mut self) -> Result<()> {
    Ok(())
  }

  #[cfg(feature = "media")]
  fn init_process(
    &mut self,
    _parameters: P,
    _format_context: Arc<Mutex<FormatContext>>,
    _response_sender: Arc<Mutex<Sender<ProcessResult>>>,
  ) -> Result<Vec<StreamDescriptor>> {
    Ok(vec![])
  }

  #[cfg(feature = "media")]
  fn process_frames(
    &mut self,
    _job_result: JobResult,
    _stream_index: usize,
    _frames: &[ProcessFrame],
  ) -> Result<ProcessResult> {
    Err(MessageError::NotImplemented())
  }

  #[cfg(feature = "media")]
  fn update_process(&mut self, _parameters: P) -> Result<()> {
    Ok(())
  }

  #[cfg(feature = "media")]
  fn ending_process(&mut self) -> Result<()> {
    Ok(())
  }

  fn is_current_job_stopped(channel: &Option<McaiChannel>) -> bool {
    if let Some(channel) = channel {
      return channel.lock().unwrap().is_stopped();
    }
    false
  }

  /// Not called when the "media" feature is enabled
  fn process(
    &self,
    _channel: Option<McaiChannel>,
    _parameters: P,
    _job_result: JobResult,
  ) -> Result<JobResult>
  where
    Self: std::marker::Sized,
  {
    Err(MessageError::NotImplemented())
  }
}