1use crate::common::{path_segment, require_non_empty};
4use crate::error::{ConduitError, Result};
5use crate::model::{
6 Job, JobEvent, JobStage, Matching, ReceiptStatus, parse_job_receipt, parse_matching,
7};
8use crate::primitives::{ActionOptions, JobsResource, StreamOptions, WaitOptions};
9use crate::reports::{Target, WebhookEndpoint, normalize_target, normalize_webhook};
10use crate::transport::{RequestBody, RequestOptions, Transport};
11use futures_util::stream::BoxStream;
12use reqwest::Method;
13use serde_json::{Map, Value};
14use std::collections::HashSet;
15use std::sync::Arc;
16use std::time::Duration;
17
18#[derive(Debug, Clone, Copy, PartialEq, Eq)]
19pub enum MatchingContext {
21 BehavioralCompatibility,
23}
24
25impl MatchingContext {
26 pub fn as_str(self) -> &'static str {
28 match self {
29 Self::BehavioralCompatibility => "behavioral_compatibility",
30 }
31 }
32
33 pub(crate) fn parse(value: &str, name: &str) -> Result<Self> {
34 match value {
35 "behavioral_compatibility" => Ok(Self::BehavioralCompatibility),
36 _ => Err(ConduitError::invalid_response(format!(
37 "invalid {name}: unsupported matching context"
38 ))),
39 }
40 }
41}
42
43#[derive(Debug, Clone)]
44pub enum SubjectRef {
46 Entity {
48 entity_id: String,
50 },
51 Media {
53 media_id: String,
55 selector: Target,
57 },
58}
59
60impl SubjectRef {
61 pub fn entity(entity_id: impl Into<String>) -> Self {
63 Self::Entity {
64 entity_id: entity_id.into(),
65 }
66 }
67
68 pub fn media(media_id: impl Into<String>, selector: Target) -> Self {
70 Self::Media {
71 media_id: media_id.into(),
72 selector,
73 }
74 }
75}
76
77#[derive(Debug, Clone)]
78pub struct MatchingCreate {
80 pub context: MatchingContext,
82 pub target: SubjectRef,
84 pub group: Vec<SubjectRef>,
86 pub webhook: Option<WebhookEndpoint>,
88 pub idempotency_key: Option<String>,
90 pub request_id: Option<String>,
92}
93
94impl MatchingCreate {
95 pub fn new(context: MatchingContext, target: SubjectRef, group: Vec<SubjectRef>) -> Self {
97 Self {
98 context,
99 target,
100 group,
101 webhook: None,
102 idempotency_key: None,
103 request_id: None,
104 }
105 }
106
107 pub fn webhook(mut self, webhook: WebhookEndpoint) -> Self {
109 self.webhook = Some(webhook);
110 self
111 }
112
113 pub fn idempotency_key(mut self, value: impl Into<String>) -> Self {
115 self.idempotency_key = Some(value.into());
116 self
117 }
118
119 pub fn request_id(mut self, value: impl Into<String>) -> Self {
121 self.request_id = Some(value.into());
122 self
123 }
124}
125
126#[derive(Debug, Clone)]
127pub struct MatchingReceipt {
129 pub job_id: String,
131 pub status: ReceiptStatus,
133 pub handle: MatchingHandle,
135 pub stage: Option<JobStage>,
137 pub estimated_wait_sec: Option<f64>,
139}
140
141#[derive(Debug, Clone)]
142pub struct MatchingHandle {
144 job_id: String,
145 jobs: JobsResource,
146 matching: MatchingResource,
147}
148
149#[derive(Debug, Clone)]
150pub struct MatchingResource {
152 transport: Arc<Transport>,
153 jobs: JobsResource,
154}
155
156impl MatchingResource {
157 pub(crate) fn new(transport: Arc<Transport>, jobs: JobsResource) -> Self {
158 Self { transport, jobs }
159 }
160
161 pub async fn create(&self, request: MatchingCreate) -> Result<MatchingReceipt> {
166 if request.group.is_empty() {
167 return Err(ConduitError::invalid_request(
168 "group must contain at least one subject",
169 ));
170 }
171
172 let target = normalize_subject(&request.target)?;
173 let mut group = Vec::with_capacity(request.group.len());
174 for subject in &request.group {
175 group.push(normalize_subject(subject)?);
176 }
177 ensure_unique_direct_entity_ids(&target, &group)?;
178
179 let mut body = Map::new();
180 body.insert(
181 "context".to_string(),
182 Value::String(request.context.as_str().to_string()),
183 );
184 body.insert("target".to_string(), target);
185 body.insert("group".to_string(), Value::Array(group));
186 if let Some(webhook) = request.webhook.as_ref() {
187 body.insert("webhook".to_string(), normalize_webhook(webhook)?);
188 }
189
190 let response = self
191 .transport
192 .request(
193 Method::POST,
194 "/v1/matching/jobs",
195 RequestOptions {
196 body: RequestBody::Json(Value::Object(body)),
197 idempotency_key: Some(
198 request
199 .idempotency_key
200 .unwrap_or_else(|| crate::common::random_id("idem")),
201 ),
202 request_id: request.request_id,
203 retryable: true,
204 ..RequestOptions::default()
205 },
206 )
207 .await?;
208 let receipt = parse_job_receipt(&response.body)?;
209 Ok(MatchingReceipt {
210 job_id: receipt.job_id.clone(),
211 status: receipt.status,
212 stage: receipt.stage,
213 estimated_wait_sec: receipt.estimated_wait_sec,
214 handle: MatchingHandle {
215 job_id: receipt.job_id,
216 jobs: self.jobs.clone(),
217 matching: self.clone(),
218 },
219 })
220 }
221
222 pub async fn get(&self, matching_id: &str) -> Result<Matching> {
224 let response = self
225 .transport
226 .request(
227 Method::GET,
228 &format!("/v1/matching/{}", path_segment(matching_id, "matching_id")?),
229 RequestOptions {
230 retryable: true,
231 ..RequestOptions::default()
232 },
233 )
234 .await?;
235 parse_matching(&response.body)
236 }
237
238 pub async fn get_by_job(&self, job_id: &str) -> Result<Option<Matching>> {
240 let response = self
241 .transport
242 .request(
243 Method::GET,
244 &format!("/v1/matching/by-job/{}", path_segment(job_id, "job_id")?),
245 RequestOptions {
246 retryable: true,
247 ..RequestOptions::default()
248 },
249 )
250 .await?;
251 if response.body.is_empty() || String::from_utf8_lossy(&response.body).trim() == "null" {
252 return Ok(None);
253 }
254 Ok(Some(parse_matching(&response.body)?))
255 }
256}
257
258impl MatchingHandle {
259 pub fn stream(&self) -> BoxStream<'static, Result<JobEvent>> {
261 self.jobs
262 .stream(self.job_id.clone(), StreamOptions::default())
263 }
264
265 pub fn stream_with(&self, options: StreamOptions) -> BoxStream<'static, Result<JobEvent>> {
267 self.jobs.stream(self.job_id.clone(), options)
268 }
269
270 pub async fn wait(&self) -> Result<Matching> {
272 self.wait_with(WaitOptions::default()).await
273 }
274
275 pub async fn wait_for(&self, timeout: Duration) -> Result<Matching> {
277 self.wait_with(WaitOptions::default().timeout(timeout))
278 .await
279 }
280
281 pub async fn wait_with(&self, options: WaitOptions) -> Result<Matching> {
283 let job = self.jobs.wait(&self.job_id, options).await?;
284 match job.matching_id {
285 Some(matching_id) => self.matching.get(&matching_id).await,
286 None => Err(ConduitError::invalid_response(format!(
287 "job {} succeeded but no matchingId was returned",
288 self.job_id
289 ))),
290 }
291 }
292
293 pub async fn cancel(&self) -> Result<Job> {
295 self.jobs.cancel(&self.job_id).await
296 }
297
298 pub async fn cancel_with(&self, options: ActionOptions) -> Result<Job> {
300 self.jobs.cancel_with(&self.job_id, options).await
301 }
302
303 pub async fn job(&self) -> Result<Job> {
305 self.jobs.get(&self.job_id).await
306 }
307
308 pub async fn matching(&self) -> Result<Option<Matching>> {
310 self.matching.get_by_job(&self.job_id).await
311 }
312}
313
314fn normalize_subject(subject: &SubjectRef) -> Result<Value> {
315 let mut payload = Map::new();
316 match subject {
317 SubjectRef::Entity { entity_id } => {
318 payload.insert("type".to_string(), Value::String("entity_id".to_string()));
319 payload.insert(
320 "entityId".to_string(),
321 Value::String(require_non_empty(entity_id, "subject.entityId")?),
322 );
323 }
324 SubjectRef::Media { media_id, selector } => {
325 payload.insert(
326 "type".to_string(),
327 Value::String("media_target".to_string()),
328 );
329 payload.insert(
330 "mediaId".to_string(),
331 Value::String(require_non_empty(media_id, "subject.mediaId")?),
332 );
333 payload.insert("selector".to_string(), normalize_target(selector)?);
334 }
335 }
336 Ok(Value::Object(payload))
337}
338
339fn ensure_unique_direct_entity_ids(target: &Value, group: &[Value]) -> Result<()> {
340 let mut seen = HashSet::new();
341 for subject in std::iter::once(target).chain(group.iter()) {
342 let Some(object) = subject.as_object() else {
343 continue;
344 };
345 let Some(entity_id) = object.get("entityId").and_then(Value::as_str) else {
346 continue;
347 };
348 if object.get("mediaId").is_some() {
349 continue;
350 }
351 if !seen.insert(entity_id.to_string()) {
352 return Err(ConduitError::invalid_request(
353 "target and group must reference different direct entity IDs",
354 ));
355 }
356 }
357 Ok(())
358}