Skip to main content

shelly_data/
integrations.rs

1use crate::{
2    error::{DataError, DataResult},
3    query::Query,
4    repo::{Row, StoredRow},
5};
6use serde::{Deserialize, Serialize};
7use serde_json::Value;
8use std::{
9    collections::{BTreeMap, HashMap},
10    fmt,
11    sync::{Arc, Mutex},
12    thread,
13    time::Duration,
14};
15
16#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
17#[serde(rename_all = "snake_case")]
18pub enum IntegrationErrorKind {
19    Transient,
20    Permanent,
21    Auth,
22    RateLimited,
23    Timeout,
24    Unavailable,
25    InvalidInput,
26}
27
28#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
29pub struct IntegrationError {
30    pub source: String,
31    pub kind: IntegrationErrorKind,
32    pub message: String,
33    pub code: Option<String>,
34    pub retryable: bool,
35}
36
37impl IntegrationError {
38    pub fn new(
39        source: impl Into<String>,
40        kind: IntegrationErrorKind,
41        message: impl Into<String>,
42    ) -> Self {
43        let kind_value = kind;
44        Self {
45            source: source.into(),
46            kind: kind_value,
47            message: message.into(),
48            code: None,
49            retryable: matches!(
50                kind_value,
51                IntegrationErrorKind::Transient
52                    | IntegrationErrorKind::RateLimited
53                    | IntegrationErrorKind::Timeout
54                    | IntegrationErrorKind::Unavailable
55            ),
56        }
57    }
58
59    pub fn with_code(mut self, code: impl Into<String>) -> Self {
60        self.code = Some(code.into());
61        self
62    }
63}
64
65impl fmt::Display for IntegrationError {
66    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
67        if let Some(code) = &self.code {
68            write!(f, "[{}:{}] {}", self.source, code, self.message)
69        } else {
70            write!(f, "[{}] {}", self.source, self.message)
71        }
72    }
73}
74
75impl std::error::Error for IntegrationError {}
76
77pub type IntegrationResult<T> = Result<T, IntegrationError>;
78
79#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
80pub struct RetryPolicy {
81    pub max_attempts: u32,
82    pub initial_backoff_ms: u64,
83    pub max_backoff_ms: u64,
84}
85
86impl RetryPolicy {
87    pub fn conservative() -> Self {
88        Self {
89            max_attempts: 3,
90            initial_backoff_ms: 50,
91            max_backoff_ms: 500,
92        }
93    }
94
95    pub fn never() -> Self {
96        Self {
97            max_attempts: 1,
98            initial_backoff_ms: 0,
99            max_backoff_ms: 0,
100        }
101    }
102}
103
104impl Default for RetryPolicy {
105    fn default() -> Self {
106        Self::conservative()
107    }
108}
109
110pub fn run_with_retry<T, F>(policy: RetryPolicy, mut operation: F) -> IntegrationResult<T>
111where
112    F: FnMut(u32) -> IntegrationResult<T>,
113{
114    let attempts = policy.max_attempts.max(1);
115    let mut backoff_ms = policy.initial_backoff_ms;
116
117    for attempt in 1..=attempts {
118        match operation(attempt) {
119            Ok(value) => return Ok(value),
120            Err(err) if !err.retryable || attempt == attempts => return Err(err),
121            Err(_) => {
122                if backoff_ms > 0 {
123                    thread::sleep(Duration::from_millis(backoff_ms));
124                }
125                backoff_ms = (backoff_ms.saturating_mul(2)).min(policy.max_backoff_ms.max(1));
126            }
127        }
128    }
129
130    Err(IntegrationError::new(
131        "retry",
132        IntegrationErrorKind::Transient,
133        "retry exhausted",
134    ))
135}
136
137pub trait ConnectionLifecycleHook: Send + Sync {
138    fn on_connect(&self) -> IntegrationResult<()> {
139        Ok(())
140    }
141
142    fn on_disconnect(&self) -> IntegrationResult<()> {
143        Ok(())
144    }
145}
146
147pub trait ConnectionLifecycle {
148    fn register_hook(&mut self, hook: Arc<dyn ConnectionLifecycleHook>);
149    fn connect(&self) -> IntegrationResult<()>;
150    fn disconnect(&self) -> IntegrationResult<()>;
151}
152
153#[derive(Default)]
154pub struct LifecycleHooks {
155    hooks: Vec<Arc<dyn ConnectionLifecycleHook>>,
156}
157
158impl LifecycleHooks {
159    pub fn new() -> Self {
160        Self::default()
161    }
162}
163
164impl ConnectionLifecycle for LifecycleHooks {
165    fn register_hook(&mut self, hook: Arc<dyn ConnectionLifecycleHook>) {
166        self.hooks.push(hook);
167    }
168
169    fn connect(&self) -> IntegrationResult<()> {
170        for hook in &self.hooks {
171            hook.on_connect()?;
172        }
173        Ok(())
174    }
175
176    fn disconnect(&self) -> IntegrationResult<()> {
177        for hook in &self.hooks {
178            hook.on_disconnect()?;
179        }
180        Ok(())
181    }
182}
183
184#[derive(Debug, Clone, PartialEq, Serialize, Deserialize, Default)]
185pub struct QueryContext {
186    pub tenant_id: Option<String>,
187    pub trace_id: Option<String>,
188    pub tags: BTreeMap<String, String>,
189}
190
191pub trait TypedQueryBoundary {
192    type Request: Send + Sync;
193    type Response: Send + Sync;
194
195    fn execute(
196        &self,
197        request: &Self::Request,
198        context: &QueryContext,
199    ) -> IntegrationResult<Self::Response>;
200}
201
202#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
203pub struct SqlCommand {
204    pub statement: String,
205    pub params: Vec<Value>,
206}
207
208impl SqlCommand {
209    pub fn new(statement: impl Into<String>, params: Vec<Value>) -> Self {
210        Self {
211            statement: statement.into(),
212            params,
213        }
214    }
215}
216
217pub trait SingleStoreAdapter:
218    TypedQueryBoundary<Request = SqlCommand, Response = Vec<Row>>
219{
220    fn run_query(&self, query: SqlCommand, context: &QueryContext) -> IntegrationResult<Vec<Row>> {
221        self.execute(&query, context)
222    }
223}
224
225#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
226pub struct SearchRequest {
227    pub index: String,
228    pub text: String,
229    pub filters: Vec<crate::query::Filter>,
230    pub page: usize,
231    pub per_page: usize,
232}
233
234impl SearchRequest {
235    pub fn new(index: impl Into<String>, text: impl Into<String>) -> Self {
236        Self {
237            index: index.into(),
238            text: text.into(),
239            filters: Vec::new(),
240            page: 1,
241            per_page: 25,
242        }
243    }
244
245    pub fn with_filter(mut self, filter: crate::query::Filter) -> Self {
246        self.filters.push(filter);
247        self
248    }
249}
250
251#[derive(Debug, Clone, PartialEq, Serialize, Deserialize, Default)]
252pub struct SearchResponse {
253    pub total_hits: usize,
254    pub rows: Vec<StoredRow>,
255}
256
257pub trait OpenSearchAdapter:
258    TypedQueryBoundary<Request = SearchRequest, Response = SearchResponse>
259{
260    fn search(
261        &self,
262        request: SearchRequest,
263        context: &QueryContext,
264    ) -> IntegrationResult<SearchResponse> {
265        self.execute(&request, context)
266    }
267}
268
269pub trait AnalyticsSink: Send + Sync {
270    fn send_event(&self, event: AnalyticsEvent) -> IntegrationResult<()>;
271}
272
273#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
274pub struct AnalyticsEvent {
275    pub namespace: String,
276    pub name: String,
277    pub payload: Value,
278}
279
280impl AnalyticsEvent {
281    pub fn new(namespace: impl Into<String>, name: impl Into<String>, payload: Value) -> Self {
282        Self {
283            namespace: namespace.into(),
284            name: name.into(),
285            payload,
286        }
287    }
288}
289
290#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
291pub struct JobRequest {
292    pub workflow: String,
293    pub payload: Value,
294    pub idempotency_key: String,
295    pub metadata: BTreeMap<String, String>,
296}
297
298impl JobRequest {
299    pub fn new(
300        workflow: impl Into<String>,
301        payload: Value,
302        idempotency_key: impl Into<String>,
303    ) -> Self {
304        Self {
305            workflow: workflow.into(),
306            payload,
307            idempotency_key: idempotency_key.into(),
308            metadata: BTreeMap::new(),
309        }
310    }
311}
312
313#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
314#[serde(rename_all = "snake_case")]
315pub enum JobState {
316    Queued,
317    Running,
318    Succeeded,
319    Failed,
320    Canceled,
321}
322
323#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
324pub struct JobStatus {
325    pub id: String,
326    pub state: JobState,
327    pub attempts: u32,
328    pub result: Option<Value>,
329    pub error: Option<IntegrationError>,
330}
331
332#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
333pub struct JobHandle {
334    pub id: String,
335    pub workflow: String,
336    pub idempotency_key: String,
337}
338
339pub type JobCompletionCallback = Arc<dyn Fn(&JobStatus) + Send + Sync>;
340
341pub trait JobOrchestrator: Send + Sync {
342    fn enqueue(&self, request: JobRequest) -> IntegrationResult<JobHandle>;
343    fn status(&self, id: &str) -> IntegrationResult<JobStatus>;
344    fn poll(&self, id: &str, attempts: u32, backoff_ms: u64) -> IntegrationResult<JobStatus>;
345    fn register_completion_callback(&self, callback: JobCompletionCallback);
346}
347
348#[derive(Debug, Clone, Default)]
349pub struct InMemorySingleStoreAdapter {
350    rows: Vec<Row>,
351}
352
353impl InMemorySingleStoreAdapter {
354    pub fn new(rows: Vec<Row>) -> Self {
355        Self { rows }
356    }
357}
358
359impl TypedQueryBoundary for InMemorySingleStoreAdapter {
360    type Request = SqlCommand;
361    type Response = Vec<Row>;
362
363    fn execute(
364        &self,
365        request: &Self::Request,
366        _context: &QueryContext,
367    ) -> IntegrationResult<Self::Response> {
368        if request.statement.trim().is_empty() {
369            return Err(IntegrationError::new(
370                "singlestore",
371                IntegrationErrorKind::InvalidInput,
372                "empty SQL statement",
373            )
374            .with_code("empty_statement"));
375        }
376        Ok(self.rows.clone())
377    }
378}
379
380impl SingleStoreAdapter for InMemorySingleStoreAdapter {}
381
382#[derive(Debug, Clone, Default)]
383pub struct InMemoryOpenSearchAdapter {
384    rows: Vec<StoredRow>,
385}
386
387impl InMemoryOpenSearchAdapter {
388    pub fn new(rows: Vec<StoredRow>) -> Self {
389        Self { rows }
390    }
391}
392
393impl TypedQueryBoundary for InMemoryOpenSearchAdapter {
394    type Request = SearchRequest;
395    type Response = SearchResponse;
396
397    fn execute(
398        &self,
399        request: &Self::Request,
400        _context: &QueryContext,
401    ) -> IntegrationResult<Self::Response> {
402        if request.index.trim().is_empty() {
403            return Err(IntegrationError::new(
404                "opensearch",
405                IntegrationErrorKind::InvalidInput,
406                "search index must not be empty",
407            )
408            .with_code("empty_index"));
409        }
410        let needle = request.text.trim().to_lowercase();
411        let mut filtered = self
412            .rows
413            .iter()
414            .filter(|row| {
415                if needle.is_empty() {
416                    return true;
417                }
418                row.data.values().any(|value| {
419                    value
420                        .as_str()
421                        .map(|text| text.to_lowercase().contains(&needle))
422                        .unwrap_or(false)
423                })
424            })
425            .cloned()
426            .collect::<Vec<_>>();
427        let total_hits = filtered.len();
428        let page = request.page.max(1);
429        let per_page = request.per_page.max(1);
430        let offset = (page - 1) * per_page;
431        filtered = filtered.into_iter().skip(offset).take(per_page).collect();
432        Ok(SearchResponse {
433            total_hits,
434            rows: filtered,
435        })
436    }
437}
438
439impl OpenSearchAdapter for InMemoryOpenSearchAdapter {}
440
441#[derive(Debug, Default, Clone)]
442pub struct InMemoryAxiomSink {
443    events: Arc<Mutex<Vec<AnalyticsEvent>>>,
444}
445
446impl InMemoryAxiomSink {
447    pub fn events(&self) -> Vec<AnalyticsEvent> {
448        self.events
449            .lock()
450            .map(|events| events.clone())
451            .unwrap_or_default()
452    }
453}
454
455impl AnalyticsSink for InMemoryAxiomSink {
456    fn send_event(&self, event: AnalyticsEvent) -> IntegrationResult<()> {
457        self.events
458            .lock()
459            .map_err(|_| {
460                IntegrationError::new(
461                    "axiom",
462                    IntegrationErrorKind::Unavailable,
463                    "analytics sink lock poisoned",
464                )
465            })?
466            .push(event);
467        Ok(())
468    }
469}
470
471#[derive(Default)]
472pub struct InMemoryJobOrchestrator {
473    statuses: Mutex<HashMap<String, JobStatus>>,
474    callbacks: Mutex<Vec<JobCompletionCallback>>,
475    next_id: Mutex<u64>,
476}
477
478impl InMemoryJobOrchestrator {
479    pub fn mark_succeeded(&self, id: &str, result: Value) -> IntegrationResult<()> {
480        let status = self.with_status_mut(id, |status| {
481            status.state = JobState::Succeeded;
482            status.result = Some(result);
483            status.error = None;
484        })?;
485        self.notify(&status);
486        Ok(())
487    }
488
489    pub fn mark_failed(&self, id: &str, error: IntegrationError) -> IntegrationResult<()> {
490        let status = self.with_status_mut(id, |status| {
491            status.state = JobState::Failed;
492            status.result = None;
493            status.error = Some(error);
494        })?;
495        self.notify(&status);
496        Ok(())
497    }
498
499    fn with_status_mut<F>(&self, id: &str, update: F) -> IntegrationResult<JobStatus>
500    where
501        F: FnOnce(&mut JobStatus),
502    {
503        let mut statuses = self.statuses.lock().map_err(|_| {
504            IntegrationError::new(
505                "trigger",
506                IntegrationErrorKind::Unavailable,
507                "job status lock poisoned",
508            )
509        })?;
510        let Some(status) = statuses.get_mut(id) else {
511            return Err(IntegrationError::new(
512                "trigger",
513                IntegrationErrorKind::InvalidInput,
514                "job not found",
515            )
516            .with_code("job_not_found"));
517        };
518        status.attempts = status.attempts.saturating_add(1);
519        update(status);
520        Ok(status.clone())
521    }
522
523    fn notify(&self, status: &JobStatus) {
524        if let Ok(callbacks) = self.callbacks.lock() {
525            for callback in callbacks.iter() {
526                callback(status);
527            }
528        }
529    }
530
531    fn is_terminal(state: JobState) -> bool {
532        matches!(
533            state,
534            JobState::Succeeded | JobState::Failed | JobState::Canceled
535        )
536    }
537}
538
539impl JobOrchestrator for InMemoryJobOrchestrator {
540    fn enqueue(&self, request: JobRequest) -> IntegrationResult<JobHandle> {
541        let mut next_id = self.next_id.lock().map_err(|_| {
542            IntegrationError::new(
543                "trigger",
544                IntegrationErrorKind::Unavailable,
545                "job id lock poisoned",
546            )
547        })?;
548        *next_id = next_id.saturating_add(1);
549        let id = format!("job-{next_id}");
550        let status = JobStatus {
551            id: id.clone(),
552            state: JobState::Queued,
553            attempts: 0,
554            result: None,
555            error: None,
556        };
557        self.statuses
558            .lock()
559            .map_err(|_| {
560                IntegrationError::new(
561                    "trigger",
562                    IntegrationErrorKind::Unavailable,
563                    "job status lock poisoned",
564                )
565            })?
566            .insert(id.clone(), status);
567        Ok(JobHandle {
568            id,
569            workflow: request.workflow,
570            idempotency_key: request.idempotency_key,
571        })
572    }
573
574    fn status(&self, id: &str) -> IntegrationResult<JobStatus> {
575        self.statuses
576            .lock()
577            .map_err(|_| {
578                IntegrationError::new(
579                    "trigger",
580                    IntegrationErrorKind::Unavailable,
581                    "job status lock poisoned",
582                )
583            })?
584            .get(id)
585            .cloned()
586            .ok_or_else(|| {
587                IntegrationError::new(
588                    "trigger",
589                    IntegrationErrorKind::InvalidInput,
590                    "job not found",
591                )
592                .with_code("job_not_found")
593            })
594    }
595
596    fn poll(&self, id: &str, attempts: u32, backoff_ms: u64) -> IntegrationResult<JobStatus> {
597        let attempts = attempts.max(1);
598        for current in 1..=attempts {
599            let status = self.status(id)?;
600            if Self::is_terminal(status.state) || current == attempts {
601                return Ok(status);
602            }
603            if backoff_ms > 0 {
604                thread::sleep(Duration::from_millis(backoff_ms));
605            }
606        }
607
608        self.status(id)
609    }
610
611    fn register_completion_callback(&self, callback: JobCompletionCallback) {
612        if let Ok(mut callbacks) = self.callbacks.lock() {
613            callbacks.push(callback);
614        }
615    }
616}
617
618pub fn map_integration_error(source: impl Into<String>, err: IntegrationError) -> DataError {
619    DataError::Integration(format!("[{}] {}", source.into(), err))
620}
621
622pub fn map_integration_result<T>(
623    source: impl Into<String>,
624    result: IntegrationResult<T>,
625) -> DataResult<T> {
626    result.map_err(|err| map_integration_error(source, err))
627}
628
629pub fn query_from_search(request: &SearchRequest) -> Query {
630    let mut query = Query::new().paginate(request.page, request.per_page);
631    for filter in &request.filters {
632        query = query.where_filter(filter.clone());
633    }
634    query
635}
636
637#[cfg(test)]
638mod tests {
639    use super::{
640        map_integration_result, run_with_retry, AnalyticsEvent, AnalyticsSink, ConnectionLifecycle,
641        ConnectionLifecycleHook, InMemoryAxiomSink, InMemoryJobOrchestrator,
642        InMemoryOpenSearchAdapter, InMemorySingleStoreAdapter, IntegrationError,
643        IntegrationErrorKind, JobHandle, JobOrchestrator, JobRequest, JobState, JobStatus,
644        LifecycleHooks, OpenSearchAdapter, QueryContext, RetryPolicy, SearchRequest,
645        SingleStoreAdapter, SqlCommand,
646    };
647    use serde_json::{json, Value};
648    use std::sync::{Arc, Mutex};
649
650    struct CountingHook {
651        connects: Arc<Mutex<u32>>,
652        disconnects: Arc<Mutex<u32>>,
653    }
654
655    impl ConnectionLifecycleHook for CountingHook {
656        fn on_connect(&self) -> super::IntegrationResult<()> {
657            let mut guard = self.connects.lock().unwrap();
658            *guard += 1;
659            Ok(())
660        }
661
662        fn on_disconnect(&self) -> super::IntegrationResult<()> {
663            let mut guard = self.disconnects.lock().unwrap();
664            *guard += 1;
665            Ok(())
666        }
667    }
668
669    #[test]
670    fn lifecycle_hooks_are_called() {
671        let connects = Arc::new(Mutex::new(0));
672        let disconnects = Arc::new(Mutex::new(0));
673        let mut lifecycle = LifecycleHooks::new();
674        lifecycle.register_hook(Arc::new(CountingHook {
675            connects: connects.clone(),
676            disconnects: disconnects.clone(),
677        }));
678
679        lifecycle.connect().unwrap();
680        lifecycle.disconnect().unwrap();
681        assert_eq!(*connects.lock().unwrap(), 1);
682        assert_eq!(*disconnects.lock().unwrap(), 1);
683    }
684
685    #[test]
686    fn retry_policy_retries_transient_errors() {
687        let mut calls = 0u32;
688        let result = run_with_retry(RetryPolicy::conservative(), |attempt| {
689            calls = attempt;
690            if attempt < 3 {
691                Err(IntegrationError::new(
692                    "opensearch",
693                    IntegrationErrorKind::Transient,
694                    "temporary failure",
695                ))
696            } else {
697                Ok("ok")
698            }
699        })
700        .unwrap();
701
702        assert_eq!(result, "ok");
703        assert_eq!(calls, 3);
704    }
705
706    #[test]
707    fn retry_policy_stops_on_permanent_errors() {
708        let mut calls = 0u32;
709        let err = run_with_retry(RetryPolicy::conservative(), |attempt| {
710            calls = attempt;
711            Err::<(), IntegrationError>(IntegrationError::new(
712                "singlestore",
713                IntegrationErrorKind::Permanent,
714                "invalid sql",
715            ))
716        })
717        .unwrap_err();
718
719        assert_eq!(calls, 1);
720        assert_eq!(err.kind, IntegrationErrorKind::Permanent);
721    }
722
723    #[test]
724    fn integration_result_maps_into_data_error() {
725        let mapped = map_integration_result::<()>(
726            "trigger",
727            Err(IntegrationError::new(
728                "trigger",
729                IntegrationErrorKind::Unavailable,
730                "service unavailable",
731            )),
732        )
733        .unwrap_err();
734
735        assert!(mapped.to_string().contains("service unavailable"));
736    }
737
738    #[derive(Default)]
739    struct InMemoryJobs {
740        statuses: Arc<Mutex<Vec<JobStatus>>>,
741        callbacks: Arc<Mutex<Vec<super::JobCompletionCallback>>>,
742    }
743
744    impl JobOrchestrator for InMemoryJobs {
745        fn enqueue(&self, request: JobRequest) -> super::IntegrationResult<JobHandle> {
746            let id = format!("job-{}", request.idempotency_key);
747            self.statuses.lock().unwrap().push(JobStatus {
748                id: id.clone(),
749                state: JobState::Queued,
750                attempts: 0,
751                result: None,
752                error: None,
753            });
754            Ok(JobHandle {
755                id,
756                workflow: request.workflow,
757                idempotency_key: request.idempotency_key,
758            })
759        }
760
761        fn status(&self, id: &str) -> super::IntegrationResult<JobStatus> {
762            self.statuses
763                .lock()
764                .unwrap()
765                .iter()
766                .find(|status| status.id == id)
767                .cloned()
768                .ok_or_else(|| {
769                    IntegrationError::new(
770                        "trigger",
771                        IntegrationErrorKind::InvalidInput,
772                        "job not found",
773                    )
774                })
775        }
776
777        fn poll(
778            &self,
779            id: &str,
780            _attempts: u32,
781            _backoff_ms: u64,
782        ) -> super::IntegrationResult<JobStatus> {
783            self.status(id)
784        }
785
786        fn register_completion_callback(&self, callback: super::JobCompletionCallback) {
787            self.callbacks.lock().unwrap().push(callback);
788        }
789    }
790
791    #[test]
792    fn job_orchestration_contract_supports_enqueue_and_status() {
793        let jobs = InMemoryJobs::default();
794        let handle = jobs
795            .enqueue(JobRequest::new(
796                "sync_customer",
797                json!({"id": 42}),
798                "idempotent-42",
799            ))
800            .unwrap();
801        let status = jobs.status(&handle.id).unwrap();
802        assert_eq!(status.state, JobState::Queued);
803        assert_eq!(handle.idempotency_key, "idempotent-42");
804
805        let ctx = QueryContext {
806            tenant_id: Some("tenant-a".to_string()),
807            ..QueryContext::default()
808        };
809        assert_eq!(ctx.tenant_id.as_deref(), Some("tenant-a"));
810    }
811
812    #[test]
813    fn reference_singlestore_adapter_runs_typed_sql_boundary() {
814        let adapter = InMemorySingleStoreAdapter::new(vec![std::collections::BTreeMap::from([(
815            "region".to_string(),
816            Value::String("EMEA".to_string()),
817        )])]);
818        let rows = adapter
819            .run_query(
820                SqlCommand::new("SELECT region FROM accounts", Vec::new()),
821                &QueryContext::default(),
822            )
823            .unwrap();
824        assert_eq!(rows.len(), 1);
825        assert_eq!(
826            rows[0].get("region"),
827            Some(&Value::String("EMEA".to_string()))
828        );
829    }
830
831    #[test]
832    fn reference_opensearch_adapter_filters_rows() {
833        let rows = vec![
834            crate::StoredRow {
835                id: 1,
836                data: std::collections::BTreeMap::from([(
837                    "title".to_string(),
838                    Value::String("Acme renewal".to_string()),
839                )]),
840            },
841            crate::StoredRow {
842                id: 2,
843                data: std::collections::BTreeMap::from([(
844                    "title".to_string(),
845                    Value::String("Globex onboarding".to_string()),
846                )]),
847            },
848        ];
849        let adapter = InMemoryOpenSearchAdapter::new(rows);
850        let response = adapter
851            .search(
852                SearchRequest::new("accounts", "renewal"),
853                &QueryContext::default(),
854            )
855            .unwrap();
856        assert_eq!(response.total_hits, 1);
857        assert_eq!(response.rows[0].id, 1);
858    }
859
860    #[test]
861    fn reference_axiom_sink_records_events() {
862        let sink = InMemoryAxiomSink::default();
863        sink.send_event(AnalyticsEvent::new(
864            "sales",
865            "query_executed",
866            json!({"latency_ms": 12}),
867        ))
868        .unwrap();
869        let events = sink.events();
870        assert_eq!(events.len(), 1);
871        assert_eq!(events[0].name, "query_executed");
872    }
873
874    #[test]
875    fn reference_trigger_orchestrator_supports_completion_and_polling() {
876        let orchestrator = InMemoryJobOrchestrator::default();
877        let completed = Arc::new(Mutex::new(false));
878        let completed_flag = completed.clone();
879        orchestrator.register_completion_callback(Arc::new(move |status| {
880            if status.state == JobState::Succeeded {
881                if let Ok(mut guard) = completed_flag.lock() {
882                    *guard = true;
883                }
884            }
885        }));
886
887        let handle = orchestrator
888            .enqueue(JobRequest::new(
889                "refresh_dashboard",
890                json!({"account_id": 7}),
891                "refresh-7",
892            ))
893            .unwrap();
894        orchestrator
895            .mark_succeeded(&handle.id, json!({"rows_synced": 18}))
896            .unwrap();
897        let status = orchestrator.poll(&handle.id, 2, 0).unwrap();
898        assert_eq!(status.state, JobState::Succeeded);
899        assert_eq!(
900            status.result,
901            Some(json!({
902                "rows_synced": 18
903            }))
904        );
905        assert!(*completed.lock().unwrap());
906    }
907}