Skip to main content

conduit_rs/
matching.rs

1//! Matching workflow types and resource methods.
2
3use crate::common::{path_segment, require_non_empty};
4use crate::error::{ConduitError, Result};
5use crate::model::{
6    Job, JobEvent, JobStage, Matching, ReceiptStatus, parse_job_receipt, parse_matching,
7};
8use crate::primitives::{ActionOptions, JobsResource, StreamOptions, WaitOptions};
9use crate::reports::{Target, WebhookEndpoint, normalize_target, normalize_webhook};
10use crate::transport::{RequestBody, RequestOptions, Transport};
11use futures_util::stream::BoxStream;
12use reqwest::Method;
13use serde_json::{Map, Value};
14use std::collections::HashSet;
15use std::sync::Arc;
16use std::time::Duration;
17
18#[derive(Debug, Clone, Copy, PartialEq, Eq)]
19/// Stable matching contexts supported by the public API.
20pub enum MatchingContext {
21    /// Behavioral compatibility analysis.
22    BehavioralCompatibility,
23}
24
25impl MatchingContext {
26    /// Returns the canonical API identifier for the context.
27    pub fn as_str(self) -> &'static str {
28        match self {
29            Self::BehavioralCompatibility => "behavioral_compatibility",
30        }
31    }
32
33    pub(crate) fn parse(value: &str, name: &str) -> Result<Self> {
34        match value {
35            "behavioral_compatibility" => Ok(Self::BehavioralCompatibility),
36            _ => Err(ConduitError::invalid_response(format!(
37                "invalid {name}: unsupported matching context"
38            ))),
39        }
40    }
41}
42
43#[derive(Debug, Clone)]
44/// Reference to a matching subject.
45pub enum SubjectRef {
46    /// Reference an existing stable entity.
47    Entity {
48        /// Stable entity identifier.
49        entity_id: String,
50    },
51    /// Reference a selected speaker inside an uploaded media file.
52    Media {
53        /// Uploaded media identifier.
54        media_id: String,
55        /// Speaker selection strategy applied to the media.
56        selector: Target,
57    },
58}
59
60impl SubjectRef {
61    /// Creates a subject reference for a stable entity.
62    pub fn entity(entity_id: impl Into<String>) -> Self {
63        Self::Entity {
64            entity_id: entity_id.into(),
65        }
66    }
67
68    /// Creates a subject reference for a media target selected by [`Target`].
69    pub fn media(media_id: impl Into<String>, selector: Target) -> Self {
70        Self::Media {
71            media_id: media_id.into(),
72            selector,
73        }
74    }
75}
76
77#[derive(Debug, Clone)]
78/// Request payload for [`MatchingResource::create`].
79pub struct MatchingCreate {
80    /// Matching context to evaluate.
81    pub context: MatchingContext,
82    /// Subject being evaluated.
83    pub target: SubjectRef,
84    /// Comparison group. Must contain at least one subject.
85    pub group: Vec<SubjectRef>,
86    /// Optional completion webhook destination.
87    pub webhook: Option<WebhookEndpoint>,
88    /// Optional idempotency key applied to the create request.
89    pub idempotency_key: Option<String>,
90    /// Optional request identifier echoed by the API.
91    pub request_id: Option<String>,
92}
93
94impl MatchingCreate {
95    /// Creates a new matching request.
96    pub fn new(context: MatchingContext, target: SubjectRef, group: Vec<SubjectRef>) -> Self {
97        Self {
98            context,
99            target,
100            group,
101            webhook: None,
102            idempotency_key: None,
103            request_id: None,
104        }
105    }
106
107    /// Attaches a completion webhook to the request.
108    pub fn webhook(mut self, webhook: WebhookEndpoint) -> Self {
109        self.webhook = Some(webhook);
110        self
111    }
112
113    /// Sets a caller-supplied idempotency key.
114    pub fn idempotency_key(mut self, value: impl Into<String>) -> Self {
115        self.idempotency_key = Some(value.into());
116        self
117    }
118
119    /// Sets a caller-supplied request identifier.
120    pub fn request_id(mut self, value: impl Into<String>) -> Self {
121        self.request_id = Some(value.into());
122        self
123    }
124}
125
126#[derive(Debug, Clone)]
127/// Receipt returned immediately after a matching job is accepted.
128pub struct MatchingReceipt {
129    /// Accepted job identifier.
130    pub job_id: String,
131    /// Initial receipt status reported by the API.
132    pub status: ReceiptStatus,
133    /// Helper for polling, waiting, and canceling the job.
134    pub handle: MatchingHandle,
135    /// Current job stage, when available.
136    pub stage: Option<JobStage>,
137    /// Advisory estimated wait time, in seconds.
138    pub estimated_wait_sec: Option<f64>,
139}
140
141#[derive(Debug, Clone)]
142/// Polling and convenience helpers associated with a matching receipt.
143pub struct MatchingHandle {
144    job_id: String,
145    jobs: JobsResource,
146    matching: MatchingResource,
147}
148
149#[derive(Debug, Clone)]
150/// Matching API resource group.
151pub struct MatchingResource {
152    transport: Arc<Transport>,
153    jobs: JobsResource,
154}
155
156impl MatchingResource {
157    pub(crate) fn new(transport: Arc<Transport>, jobs: JobsResource) -> Self {
158        Self { transport, jobs }
159    }
160
161    /// Creates a matching job.
162    ///
163    /// This method validates the canonical subject references, creates the job, and returns a
164    /// receipt without waiting for the matching result.
165    pub async fn create(&self, request: MatchingCreate) -> Result<MatchingReceipt> {
166        if request.group.is_empty() {
167            return Err(ConduitError::invalid_request(
168                "group must contain at least one subject",
169            ));
170        }
171
172        let target = normalize_subject(&request.target)?;
173        let mut group = Vec::with_capacity(request.group.len());
174        for subject in &request.group {
175            group.push(normalize_subject(subject)?);
176        }
177        ensure_unique_direct_entity_ids(&target, &group)?;
178
179        let mut body = Map::new();
180        body.insert(
181            "context".to_string(),
182            Value::String(request.context.as_str().to_string()),
183        );
184        body.insert("target".to_string(), target);
185        body.insert("group".to_string(), Value::Array(group));
186        if let Some(webhook) = request.webhook.as_ref() {
187            body.insert("webhook".to_string(), normalize_webhook(webhook)?);
188        }
189
190        let response = self
191            .transport
192            .request(
193                Method::POST,
194                "/v1/matching/jobs",
195                RequestOptions {
196                    body: RequestBody::Json(Value::Object(body)),
197                    idempotency_key: Some(
198                        request
199                            .idempotency_key
200                            .unwrap_or_else(|| crate::common::random_id("idem")),
201                    ),
202                    request_id: request.request_id,
203                    retryable: true,
204                    ..RequestOptions::default()
205                },
206            )
207            .await?;
208        let receipt = parse_job_receipt(&response.body)?;
209        Ok(MatchingReceipt {
210            job_id: receipt.job_id.clone(),
211            status: receipt.status,
212            stage: receipt.stage,
213            estimated_wait_sec: receipt.estimated_wait_sec,
214            handle: MatchingHandle {
215                job_id: receipt.job_id,
216                jobs: self.jobs.clone(),
217                matching: self.clone(),
218            },
219        })
220    }
221
222    /// Fetches a completed matching result by identifier.
223    pub async fn get(&self, matching_id: &str) -> Result<Matching> {
224        let response = self
225            .transport
226            .request(
227                Method::GET,
228                &format!("/v1/matching/{}", path_segment(matching_id, "matching_id")?),
229                RequestOptions {
230                    retryable: true,
231                    ..RequestOptions::default()
232                },
233            )
234            .await?;
235        parse_matching(&response.body)
236    }
237
238    /// Fetches the completed matching result associated with a job, if one exists yet.
239    pub async fn get_by_job(&self, job_id: &str) -> Result<Option<Matching>> {
240        let response = self
241            .transport
242            .request(
243                Method::GET,
244                &format!("/v1/matching/by-job/{}", path_segment(job_id, "job_id")?),
245                RequestOptions {
246                    retryable: true,
247                    ..RequestOptions::default()
248                },
249            )
250            .await?;
251        if response.body.is_empty() || String::from_utf8_lossy(&response.body).trim() == "null" {
252            return Ok(None);
253        }
254        Ok(Some(parse_matching(&response.body)?))
255    }
256}
257
258impl MatchingHandle {
259    /// Polls the job and yields status, stage, and terminal events.
260    pub fn stream(&self) -> BoxStream<'static, Result<JobEvent>> {
261        self.jobs
262            .stream(self.job_id.clone(), StreamOptions::default())
263    }
264
265    /// Polls the job using custom stream options.
266    pub fn stream_with(&self, options: StreamOptions) -> BoxStream<'static, Result<JobEvent>> {
267        self.jobs.stream(self.job_id.clone(), options)
268    }
269
270    /// Waits for a terminal job state using default polling settings.
271    pub async fn wait(&self) -> Result<Matching> {
272        self.wait_with(WaitOptions::default()).await
273    }
274
275    /// Waits for matching completion with a custom timeout.
276    pub async fn wait_for(&self, timeout: Duration) -> Result<Matching> {
277        self.wait_with(WaitOptions::default().timeout(timeout))
278            .await
279    }
280
281    /// Waits for matching completion using custom polling options.
282    pub async fn wait_with(&self, options: WaitOptions) -> Result<Matching> {
283        let job = self.jobs.wait(&self.job_id, options).await?;
284        match job.matching_id {
285            Some(matching_id) => self.matching.get(&matching_id).await,
286            None => Err(ConduitError::invalid_response(format!(
287                "job {} succeeded but no matchingId was returned",
288                self.job_id
289            ))),
290        }
291    }
292
293    /// Requests cancellation for the underlying job.
294    pub async fn cancel(&self) -> Result<Job> {
295        self.jobs.cancel(&self.job_id).await
296    }
297
298    /// Requests cancellation using custom action options.
299    pub async fn cancel_with(&self, options: ActionOptions) -> Result<Job> {
300        self.jobs.cancel_with(&self.job_id, options).await
301    }
302
303    /// Fetches the latest job state.
304    pub async fn job(&self) -> Result<Job> {
305        self.jobs.get(&self.job_id).await
306    }
307
308    /// Fetches the matching result if the job has already completed successfully.
309    pub async fn matching(&self) -> Result<Option<Matching>> {
310        self.matching.get_by_job(&self.job_id).await
311    }
312}
313
314fn normalize_subject(subject: &SubjectRef) -> Result<Value> {
315    let mut payload = Map::new();
316    match subject {
317        SubjectRef::Entity { entity_id } => {
318            payload.insert("type".to_string(), Value::String("entity_id".to_string()));
319            payload.insert(
320                "entityId".to_string(),
321                Value::String(require_non_empty(entity_id, "subject.entityId")?),
322            );
323        }
324        SubjectRef::Media { media_id, selector } => {
325            payload.insert(
326                "type".to_string(),
327                Value::String("media_target".to_string()),
328            );
329            payload.insert(
330                "mediaId".to_string(),
331                Value::String(require_non_empty(media_id, "subject.mediaId")?),
332            );
333            payload.insert("selector".to_string(), normalize_target(selector)?);
334        }
335    }
336    Ok(Value::Object(payload))
337}
338
339fn ensure_unique_direct_entity_ids(target: &Value, group: &[Value]) -> Result<()> {
340    let mut seen = HashSet::new();
341    for subject in std::iter::once(target).chain(group.iter()) {
342        let Some(object) = subject.as_object() else {
343            continue;
344        };
345        let Some(entity_id) = object.get("entityId").and_then(Value::as_str) else {
346            continue;
347        };
348        if object.get("mediaId").is_some() {
349            continue;
350        }
351        if !seen.insert(entity_id.to_string()) {
352            return Err(ConduitError::invalid_request(
353                "target and group must reference different direct entity IDs",
354            ));
355        }
356    }
357    Ok(())
358}