1use crate::{
2 error::{DataError, DataResult},
3 query::Query,
4 repo::{Row, StoredRow},
5};
6use serde::{Deserialize, Serialize};
7use serde_json::Value;
8use std::{
9 collections::{BTreeMap, HashMap},
10 fmt,
11 sync::{Arc, Mutex},
12 thread,
13 time::Duration,
14};
15
16#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
17#[serde(rename_all = "snake_case")]
18pub enum IntegrationErrorKind {
19 Transient,
20 Permanent,
21 Auth,
22 RateLimited,
23 Timeout,
24 Unavailable,
25 InvalidInput,
26}
27
28#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
29pub struct IntegrationError {
30 pub source: String,
31 pub kind: IntegrationErrorKind,
32 pub message: String,
33 pub code: Option<String>,
34 pub retryable: bool,
35}
36
37impl IntegrationError {
38 pub fn new(
39 source: impl Into<String>,
40 kind: IntegrationErrorKind,
41 message: impl Into<String>,
42 ) -> Self {
43 let kind_value = kind;
44 Self {
45 source: source.into(),
46 kind: kind_value,
47 message: message.into(),
48 code: None,
49 retryable: matches!(
50 kind_value,
51 IntegrationErrorKind::Transient
52 | IntegrationErrorKind::RateLimited
53 | IntegrationErrorKind::Timeout
54 | IntegrationErrorKind::Unavailable
55 ),
56 }
57 }
58
59 pub fn with_code(mut self, code: impl Into<String>) -> Self {
60 self.code = Some(code.into());
61 self
62 }
63}
64
65impl fmt::Display for IntegrationError {
66 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
67 if let Some(code) = &self.code {
68 write!(f, "[{}:{}] {}", self.source, code, self.message)
69 } else {
70 write!(f, "[{}] {}", self.source, self.message)
71 }
72 }
73}
74
75impl std::error::Error for IntegrationError {}
76
77pub type IntegrationResult<T> = Result<T, IntegrationError>;
78
79#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
80pub struct RetryPolicy {
81 pub max_attempts: u32,
82 pub initial_backoff_ms: u64,
83 pub max_backoff_ms: u64,
84}
85
86impl RetryPolicy {
87 pub fn conservative() -> Self {
88 Self {
89 max_attempts: 3,
90 initial_backoff_ms: 50,
91 max_backoff_ms: 500,
92 }
93 }
94
95 pub fn never() -> Self {
96 Self {
97 max_attempts: 1,
98 initial_backoff_ms: 0,
99 max_backoff_ms: 0,
100 }
101 }
102}
103
104impl Default for RetryPolicy {
105 fn default() -> Self {
106 Self::conservative()
107 }
108}
109
110pub fn run_with_retry<T, F>(policy: RetryPolicy, mut operation: F) -> IntegrationResult<T>
111where
112 F: FnMut(u32) -> IntegrationResult<T>,
113{
114 let attempts = policy.max_attempts.max(1);
115 let mut backoff_ms = policy.initial_backoff_ms;
116
117 for attempt in 1..=attempts {
118 match operation(attempt) {
119 Ok(value) => return Ok(value),
120 Err(err) if !err.retryable || attempt == attempts => return Err(err),
121 Err(_) => {
122 if backoff_ms > 0 {
123 thread::sleep(Duration::from_millis(backoff_ms));
124 }
125 backoff_ms = (backoff_ms.saturating_mul(2)).min(policy.max_backoff_ms.max(1));
126 }
127 }
128 }
129
130 Err(IntegrationError::new(
131 "retry",
132 IntegrationErrorKind::Transient,
133 "retry exhausted",
134 ))
135}
136
137pub trait ConnectionLifecycleHook: Send + Sync {
138 fn on_connect(&self) -> IntegrationResult<()> {
139 Ok(())
140 }
141
142 fn on_disconnect(&self) -> IntegrationResult<()> {
143 Ok(())
144 }
145}
146
147pub trait ConnectionLifecycle {
148 fn register_hook(&mut self, hook: Arc<dyn ConnectionLifecycleHook>);
149 fn connect(&self) -> IntegrationResult<()>;
150 fn disconnect(&self) -> IntegrationResult<()>;
151}
152
153#[derive(Default)]
154pub struct LifecycleHooks {
155 hooks: Vec<Arc<dyn ConnectionLifecycleHook>>,
156}
157
158impl LifecycleHooks {
159 pub fn new() -> Self {
160 Self::default()
161 }
162}
163
164impl ConnectionLifecycle for LifecycleHooks {
165 fn register_hook(&mut self, hook: Arc<dyn ConnectionLifecycleHook>) {
166 self.hooks.push(hook);
167 }
168
169 fn connect(&self) -> IntegrationResult<()> {
170 for hook in &self.hooks {
171 hook.on_connect()?;
172 }
173 Ok(())
174 }
175
176 fn disconnect(&self) -> IntegrationResult<()> {
177 for hook in &self.hooks {
178 hook.on_disconnect()?;
179 }
180 Ok(())
181 }
182}
183
184#[derive(Debug, Clone, PartialEq, Serialize, Deserialize, Default)]
185pub struct QueryContext {
186 pub tenant_id: Option<String>,
187 pub trace_id: Option<String>,
188 pub tags: BTreeMap<String, String>,
189}
190
191pub trait TypedQueryBoundary {
192 type Request: Send + Sync;
193 type Response: Send + Sync;
194
195 fn execute(
196 &self,
197 request: &Self::Request,
198 context: &QueryContext,
199 ) -> IntegrationResult<Self::Response>;
200}
201
202#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
203pub struct SqlCommand {
204 pub statement: String,
205 pub params: Vec<Value>,
206}
207
208impl SqlCommand {
209 pub fn new(statement: impl Into<String>, params: Vec<Value>) -> Self {
210 Self {
211 statement: statement.into(),
212 params,
213 }
214 }
215}
216
217pub trait SingleStoreAdapter:
218 TypedQueryBoundary<Request = SqlCommand, Response = Vec<Row>>
219{
220 fn run_query(&self, query: SqlCommand, context: &QueryContext) -> IntegrationResult<Vec<Row>> {
221 self.execute(&query, context)
222 }
223}
224
225#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
226pub struct SearchRequest {
227 pub index: String,
228 pub text: String,
229 pub filters: Vec<crate::query::Filter>,
230 pub page: usize,
231 pub per_page: usize,
232}
233
234impl SearchRequest {
235 pub fn new(index: impl Into<String>, text: impl Into<String>) -> Self {
236 Self {
237 index: index.into(),
238 text: text.into(),
239 filters: Vec::new(),
240 page: 1,
241 per_page: 25,
242 }
243 }
244
245 pub fn with_filter(mut self, filter: crate::query::Filter) -> Self {
246 self.filters.push(filter);
247 self
248 }
249}
250
251#[derive(Debug, Clone, PartialEq, Serialize, Deserialize, Default)]
252pub struct SearchResponse {
253 pub total_hits: usize,
254 pub rows: Vec<StoredRow>,
255}
256
257pub trait OpenSearchAdapter:
258 TypedQueryBoundary<Request = SearchRequest, Response = SearchResponse>
259{
260 fn search(
261 &self,
262 request: SearchRequest,
263 context: &QueryContext,
264 ) -> IntegrationResult<SearchResponse> {
265 self.execute(&request, context)
266 }
267}
268
269pub trait AnalyticsSink: Send + Sync {
270 fn send_event(&self, event: AnalyticsEvent) -> IntegrationResult<()>;
271}
272
273#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
274pub struct AnalyticsEvent {
275 pub namespace: String,
276 pub name: String,
277 pub payload: Value,
278}
279
280impl AnalyticsEvent {
281 pub fn new(namespace: impl Into<String>, name: impl Into<String>, payload: Value) -> Self {
282 Self {
283 namespace: namespace.into(),
284 name: name.into(),
285 payload,
286 }
287 }
288}
289
290#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
291pub struct JobRequest {
292 pub workflow: String,
293 pub payload: Value,
294 pub idempotency_key: String,
295 pub metadata: BTreeMap<String, String>,
296}
297
298impl JobRequest {
299 pub fn new(
300 workflow: impl Into<String>,
301 payload: Value,
302 idempotency_key: impl Into<String>,
303 ) -> Self {
304 Self {
305 workflow: workflow.into(),
306 payload,
307 idempotency_key: idempotency_key.into(),
308 metadata: BTreeMap::new(),
309 }
310 }
311}
312
313#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
314#[serde(rename_all = "snake_case")]
315pub enum JobState {
316 Queued,
317 Running,
318 Succeeded,
319 Failed,
320 Canceled,
321}
322
323#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
324pub struct JobStatus {
325 pub id: String,
326 pub state: JobState,
327 pub attempts: u32,
328 pub result: Option<Value>,
329 pub error: Option<IntegrationError>,
330}
331
332#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
333pub struct JobHandle {
334 pub id: String,
335 pub workflow: String,
336 pub idempotency_key: String,
337}
338
339pub type JobCompletionCallback = Arc<dyn Fn(&JobStatus) + Send + Sync>;
340
341pub trait JobOrchestrator: Send + Sync {
342 fn enqueue(&self, request: JobRequest) -> IntegrationResult<JobHandle>;
343 fn status(&self, id: &str) -> IntegrationResult<JobStatus>;
344 fn poll(&self, id: &str, attempts: u32, backoff_ms: u64) -> IntegrationResult<JobStatus>;
345 fn register_completion_callback(&self, callback: JobCompletionCallback);
346}
347
348#[derive(Debug, Clone, Default)]
349pub struct InMemorySingleStoreAdapter {
350 rows: Vec<Row>,
351}
352
353impl InMemorySingleStoreAdapter {
354 pub fn new(rows: Vec<Row>) -> Self {
355 Self { rows }
356 }
357}
358
359impl TypedQueryBoundary for InMemorySingleStoreAdapter {
360 type Request = SqlCommand;
361 type Response = Vec<Row>;
362
363 fn execute(
364 &self,
365 request: &Self::Request,
366 _context: &QueryContext,
367 ) -> IntegrationResult<Self::Response> {
368 if request.statement.trim().is_empty() {
369 return Err(IntegrationError::new(
370 "singlestore",
371 IntegrationErrorKind::InvalidInput,
372 "empty SQL statement",
373 )
374 .with_code("empty_statement"));
375 }
376 Ok(self.rows.clone())
377 }
378}
379
380impl SingleStoreAdapter for InMemorySingleStoreAdapter {}
381
382#[derive(Debug, Clone, Default)]
383pub struct InMemoryOpenSearchAdapter {
384 rows: Vec<StoredRow>,
385}
386
387impl InMemoryOpenSearchAdapter {
388 pub fn new(rows: Vec<StoredRow>) -> Self {
389 Self { rows }
390 }
391}
392
393impl TypedQueryBoundary for InMemoryOpenSearchAdapter {
394 type Request = SearchRequest;
395 type Response = SearchResponse;
396
397 fn execute(
398 &self,
399 request: &Self::Request,
400 _context: &QueryContext,
401 ) -> IntegrationResult<Self::Response> {
402 if request.index.trim().is_empty() {
403 return Err(IntegrationError::new(
404 "opensearch",
405 IntegrationErrorKind::InvalidInput,
406 "search index must not be empty",
407 )
408 .with_code("empty_index"));
409 }
410 let needle = request.text.trim().to_lowercase();
411 let mut filtered = self
412 .rows
413 .iter()
414 .filter(|row| {
415 if needle.is_empty() {
416 return true;
417 }
418 row.data.values().any(|value| {
419 value
420 .as_str()
421 .map(|text| text.to_lowercase().contains(&needle))
422 .unwrap_or(false)
423 })
424 })
425 .cloned()
426 .collect::<Vec<_>>();
427 let total_hits = filtered.len();
428 let page = request.page.max(1);
429 let per_page = request.per_page.max(1);
430 let offset = (page - 1) * per_page;
431 filtered = filtered.into_iter().skip(offset).take(per_page).collect();
432 Ok(SearchResponse {
433 total_hits,
434 rows: filtered,
435 })
436 }
437}
438
439impl OpenSearchAdapter for InMemoryOpenSearchAdapter {}
440
441#[derive(Debug, Default, Clone)]
442pub struct InMemoryAxiomSink {
443 events: Arc<Mutex<Vec<AnalyticsEvent>>>,
444}
445
446impl InMemoryAxiomSink {
447 pub fn events(&self) -> Vec<AnalyticsEvent> {
448 self.events
449 .lock()
450 .map(|events| events.clone())
451 .unwrap_or_default()
452 }
453}
454
455impl AnalyticsSink for InMemoryAxiomSink {
456 fn send_event(&self, event: AnalyticsEvent) -> IntegrationResult<()> {
457 self.events
458 .lock()
459 .map_err(|_| {
460 IntegrationError::new(
461 "axiom",
462 IntegrationErrorKind::Unavailable,
463 "analytics sink lock poisoned",
464 )
465 })?
466 .push(event);
467 Ok(())
468 }
469}
470
471#[derive(Default)]
472pub struct InMemoryJobOrchestrator {
473 statuses: Mutex<HashMap<String, JobStatus>>,
474 callbacks: Mutex<Vec<JobCompletionCallback>>,
475 next_id: Mutex<u64>,
476}
477
478impl InMemoryJobOrchestrator {
479 pub fn mark_succeeded(&self, id: &str, result: Value) -> IntegrationResult<()> {
480 let status = self.with_status_mut(id, |status| {
481 status.state = JobState::Succeeded;
482 status.result = Some(result);
483 status.error = None;
484 })?;
485 self.notify(&status);
486 Ok(())
487 }
488
489 pub fn mark_failed(&self, id: &str, error: IntegrationError) -> IntegrationResult<()> {
490 let status = self.with_status_mut(id, |status| {
491 status.state = JobState::Failed;
492 status.result = None;
493 status.error = Some(error);
494 })?;
495 self.notify(&status);
496 Ok(())
497 }
498
499 fn with_status_mut<F>(&self, id: &str, update: F) -> IntegrationResult<JobStatus>
500 where
501 F: FnOnce(&mut JobStatus),
502 {
503 let mut statuses = self.statuses.lock().map_err(|_| {
504 IntegrationError::new(
505 "trigger",
506 IntegrationErrorKind::Unavailable,
507 "job status lock poisoned",
508 )
509 })?;
510 let Some(status) = statuses.get_mut(id) else {
511 return Err(IntegrationError::new(
512 "trigger",
513 IntegrationErrorKind::InvalidInput,
514 "job not found",
515 )
516 .with_code("job_not_found"));
517 };
518 status.attempts = status.attempts.saturating_add(1);
519 update(status);
520 Ok(status.clone())
521 }
522
523 fn notify(&self, status: &JobStatus) {
524 if let Ok(callbacks) = self.callbacks.lock() {
525 for callback in callbacks.iter() {
526 callback(status);
527 }
528 }
529 }
530
531 fn is_terminal(state: JobState) -> bool {
532 matches!(
533 state,
534 JobState::Succeeded | JobState::Failed | JobState::Canceled
535 )
536 }
537}
538
539impl JobOrchestrator for InMemoryJobOrchestrator {
540 fn enqueue(&self, request: JobRequest) -> IntegrationResult<JobHandle> {
541 let mut next_id = self.next_id.lock().map_err(|_| {
542 IntegrationError::new(
543 "trigger",
544 IntegrationErrorKind::Unavailable,
545 "job id lock poisoned",
546 )
547 })?;
548 *next_id = next_id.saturating_add(1);
549 let id = format!("job-{next_id}");
550 let status = JobStatus {
551 id: id.clone(),
552 state: JobState::Queued,
553 attempts: 0,
554 result: None,
555 error: None,
556 };
557 self.statuses
558 .lock()
559 .map_err(|_| {
560 IntegrationError::new(
561 "trigger",
562 IntegrationErrorKind::Unavailable,
563 "job status lock poisoned",
564 )
565 })?
566 .insert(id.clone(), status);
567 Ok(JobHandle {
568 id,
569 workflow: request.workflow,
570 idempotency_key: request.idempotency_key,
571 })
572 }
573
574 fn status(&self, id: &str) -> IntegrationResult<JobStatus> {
575 self.statuses
576 .lock()
577 .map_err(|_| {
578 IntegrationError::new(
579 "trigger",
580 IntegrationErrorKind::Unavailable,
581 "job status lock poisoned",
582 )
583 })?
584 .get(id)
585 .cloned()
586 .ok_or_else(|| {
587 IntegrationError::new(
588 "trigger",
589 IntegrationErrorKind::InvalidInput,
590 "job not found",
591 )
592 .with_code("job_not_found")
593 })
594 }
595
596 fn poll(&self, id: &str, attempts: u32, backoff_ms: u64) -> IntegrationResult<JobStatus> {
597 let attempts = attempts.max(1);
598 for current in 1..=attempts {
599 let status = self.status(id)?;
600 if Self::is_terminal(status.state) || current == attempts {
601 return Ok(status);
602 }
603 if backoff_ms > 0 {
604 thread::sleep(Duration::from_millis(backoff_ms));
605 }
606 }
607
608 self.status(id)
609 }
610
611 fn register_completion_callback(&self, callback: JobCompletionCallback) {
612 if let Ok(mut callbacks) = self.callbacks.lock() {
613 callbacks.push(callback);
614 }
615 }
616}
617
618pub fn map_integration_error(source: impl Into<String>, err: IntegrationError) -> DataError {
619 DataError::Integration(format!("[{}] {}", source.into(), err))
620}
621
622pub fn map_integration_result<T>(
623 source: impl Into<String>,
624 result: IntegrationResult<T>,
625) -> DataResult<T> {
626 result.map_err(|err| map_integration_error(source, err))
627}
628
629pub fn query_from_search(request: &SearchRequest) -> Query {
630 let mut query = Query::new().paginate(request.page, request.per_page);
631 for filter in &request.filters {
632 query = query.where_filter(filter.clone());
633 }
634 query
635}
636
637#[cfg(test)]
638mod tests {
639 use super::{
640 map_integration_result, run_with_retry, AnalyticsEvent, AnalyticsSink, ConnectionLifecycle,
641 ConnectionLifecycleHook, InMemoryAxiomSink, InMemoryJobOrchestrator,
642 InMemoryOpenSearchAdapter, InMemorySingleStoreAdapter, IntegrationError,
643 IntegrationErrorKind, JobHandle, JobOrchestrator, JobRequest, JobState, JobStatus,
644 LifecycleHooks, OpenSearchAdapter, QueryContext, RetryPolicy, SearchRequest,
645 SingleStoreAdapter, SqlCommand,
646 };
647 use serde_json::{json, Value};
648 use std::sync::{Arc, Mutex};
649
650 struct CountingHook {
651 connects: Arc<Mutex<u32>>,
652 disconnects: Arc<Mutex<u32>>,
653 }
654
655 impl ConnectionLifecycleHook for CountingHook {
656 fn on_connect(&self) -> super::IntegrationResult<()> {
657 let mut guard = self.connects.lock().unwrap();
658 *guard += 1;
659 Ok(())
660 }
661
662 fn on_disconnect(&self) -> super::IntegrationResult<()> {
663 let mut guard = self.disconnects.lock().unwrap();
664 *guard += 1;
665 Ok(())
666 }
667 }
668
669 #[test]
670 fn lifecycle_hooks_are_called() {
671 let connects = Arc::new(Mutex::new(0));
672 let disconnects = Arc::new(Mutex::new(0));
673 let mut lifecycle = LifecycleHooks::new();
674 lifecycle.register_hook(Arc::new(CountingHook {
675 connects: connects.clone(),
676 disconnects: disconnects.clone(),
677 }));
678
679 lifecycle.connect().unwrap();
680 lifecycle.disconnect().unwrap();
681 assert_eq!(*connects.lock().unwrap(), 1);
682 assert_eq!(*disconnects.lock().unwrap(), 1);
683 }
684
685 #[test]
686 fn retry_policy_retries_transient_errors() {
687 let mut calls = 0u32;
688 let result = run_with_retry(RetryPolicy::conservative(), |attempt| {
689 calls = attempt;
690 if attempt < 3 {
691 Err(IntegrationError::new(
692 "opensearch",
693 IntegrationErrorKind::Transient,
694 "temporary failure",
695 ))
696 } else {
697 Ok("ok")
698 }
699 })
700 .unwrap();
701
702 assert_eq!(result, "ok");
703 assert_eq!(calls, 3);
704 }
705
706 #[test]
707 fn retry_policy_stops_on_permanent_errors() {
708 let mut calls = 0u32;
709 let err = run_with_retry(RetryPolicy::conservative(), |attempt| {
710 calls = attempt;
711 Err::<(), IntegrationError>(IntegrationError::new(
712 "singlestore",
713 IntegrationErrorKind::Permanent,
714 "invalid sql",
715 ))
716 })
717 .unwrap_err();
718
719 assert_eq!(calls, 1);
720 assert_eq!(err.kind, IntegrationErrorKind::Permanent);
721 }
722
723 #[test]
724 fn integration_result_maps_into_data_error() {
725 let mapped = map_integration_result::<()>(
726 "trigger",
727 Err(IntegrationError::new(
728 "trigger",
729 IntegrationErrorKind::Unavailable,
730 "service unavailable",
731 )),
732 )
733 .unwrap_err();
734
735 assert!(mapped.to_string().contains("service unavailable"));
736 }
737
738 #[derive(Default)]
739 struct InMemoryJobs {
740 statuses: Arc<Mutex<Vec<JobStatus>>>,
741 callbacks: Arc<Mutex<Vec<super::JobCompletionCallback>>>,
742 }
743
744 impl JobOrchestrator for InMemoryJobs {
745 fn enqueue(&self, request: JobRequest) -> super::IntegrationResult<JobHandle> {
746 let id = format!("job-{}", request.idempotency_key);
747 self.statuses.lock().unwrap().push(JobStatus {
748 id: id.clone(),
749 state: JobState::Queued,
750 attempts: 0,
751 result: None,
752 error: None,
753 });
754 Ok(JobHandle {
755 id,
756 workflow: request.workflow,
757 idempotency_key: request.idempotency_key,
758 })
759 }
760
761 fn status(&self, id: &str) -> super::IntegrationResult<JobStatus> {
762 self.statuses
763 .lock()
764 .unwrap()
765 .iter()
766 .find(|status| status.id == id)
767 .cloned()
768 .ok_or_else(|| {
769 IntegrationError::new(
770 "trigger",
771 IntegrationErrorKind::InvalidInput,
772 "job not found",
773 )
774 })
775 }
776
777 fn poll(
778 &self,
779 id: &str,
780 _attempts: u32,
781 _backoff_ms: u64,
782 ) -> super::IntegrationResult<JobStatus> {
783 self.status(id)
784 }
785
786 fn register_completion_callback(&self, callback: super::JobCompletionCallback) {
787 self.callbacks.lock().unwrap().push(callback);
788 }
789 }
790
791 #[test]
792 fn job_orchestration_contract_supports_enqueue_and_status() {
793 let jobs = InMemoryJobs::default();
794 let handle = jobs
795 .enqueue(JobRequest::new(
796 "sync_customer",
797 json!({"id": 42}),
798 "idempotent-42",
799 ))
800 .unwrap();
801 let status = jobs.status(&handle.id).unwrap();
802 assert_eq!(status.state, JobState::Queued);
803 assert_eq!(handle.idempotency_key, "idempotent-42");
804
805 let ctx = QueryContext {
806 tenant_id: Some("tenant-a".to_string()),
807 ..QueryContext::default()
808 };
809 assert_eq!(ctx.tenant_id.as_deref(), Some("tenant-a"));
810 }
811
812 #[test]
813 fn reference_singlestore_adapter_runs_typed_sql_boundary() {
814 let adapter = InMemorySingleStoreAdapter::new(vec![std::collections::BTreeMap::from([(
815 "region".to_string(),
816 Value::String("EMEA".to_string()),
817 )])]);
818 let rows = adapter
819 .run_query(
820 SqlCommand::new("SELECT region FROM accounts", Vec::new()),
821 &QueryContext::default(),
822 )
823 .unwrap();
824 assert_eq!(rows.len(), 1);
825 assert_eq!(
826 rows[0].get("region"),
827 Some(&Value::String("EMEA".to_string()))
828 );
829 }
830
831 #[test]
832 fn reference_opensearch_adapter_filters_rows() {
833 let rows = vec![
834 crate::StoredRow {
835 id: 1,
836 data: std::collections::BTreeMap::from([(
837 "title".to_string(),
838 Value::String("Acme renewal".to_string()),
839 )]),
840 },
841 crate::StoredRow {
842 id: 2,
843 data: std::collections::BTreeMap::from([(
844 "title".to_string(),
845 Value::String("Globex onboarding".to_string()),
846 )]),
847 },
848 ];
849 let adapter = InMemoryOpenSearchAdapter::new(rows);
850 let response = adapter
851 .search(
852 SearchRequest::new("accounts", "renewal"),
853 &QueryContext::default(),
854 )
855 .unwrap();
856 assert_eq!(response.total_hits, 1);
857 assert_eq!(response.rows[0].id, 1);
858 }
859
860 #[test]
861 fn reference_axiom_sink_records_events() {
862 let sink = InMemoryAxiomSink::default();
863 sink.send_event(AnalyticsEvent::new(
864 "sales",
865 "query_executed",
866 json!({"latency_ms": 12}),
867 ))
868 .unwrap();
869 let events = sink.events();
870 assert_eq!(events.len(), 1);
871 assert_eq!(events[0].name, "query_executed");
872 }
873
874 #[test]
875 fn reference_trigger_orchestrator_supports_completion_and_polling() {
876 let orchestrator = InMemoryJobOrchestrator::default();
877 let completed = Arc::new(Mutex::new(false));
878 let completed_flag = completed.clone();
879 orchestrator.register_completion_callback(Arc::new(move |status| {
880 if status.state == JobState::Succeeded {
881 if let Ok(mut guard) = completed_flag.lock() {
882 *guard = true;
883 }
884 }
885 }));
886
887 let handle = orchestrator
888 .enqueue(JobRequest::new(
889 "refresh_dashboard",
890 json!({"account_id": 7}),
891 "refresh-7",
892 ))
893 .unwrap();
894 orchestrator
895 .mark_succeeded(&handle.id, json!({"rows_synced": 18}))
896 .unwrap();
897 let status = orchestrator.poll(&handle.id, 2, 0).unwrap();
898 assert_eq!(status.state, JobState::Succeeded);
899 assert_eq!(
900 status.result,
901 Some(json!({
902 "rows_synced": 18
903 }))
904 );
905 assert!(*completed.lock().unwrap());
906 }
907}