Skip to main content

shelly_data/
integrations.rs

1use crate::{
2    error::{DataError, DataResult},
3    query::Query,
4    repo::{Row, StoredRow},
5};
6use serde::{Deserialize, Serialize};
7use serde_json::Value;
8use std::{
9    collections::{BTreeMap, HashMap},
10    fmt,
11    sync::{Arc, Mutex},
12    thread,
13    time::{Duration, Instant},
14};
15use tracing::{info, warn};
16
17#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
18#[serde(rename_all = "snake_case")]
19pub enum IntegrationErrorKind {
20    Transient,
21    Permanent,
22    Auth,
23    RateLimited,
24    Timeout,
25    Unavailable,
26    InvalidInput,
27}
28
29#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
30pub struct IntegrationError {
31    pub source: String,
32    pub kind: IntegrationErrorKind,
33    pub message: String,
34    pub code: Option<String>,
35    pub retryable: bool,
36}
37
38impl IntegrationError {
39    pub fn new(
40        source: impl Into<String>,
41        kind: IntegrationErrorKind,
42        message: impl Into<String>,
43    ) -> Self {
44        let kind_value = kind;
45        Self {
46            source: source.into(),
47            kind: kind_value,
48            message: message.into(),
49            code: None,
50            retryable: matches!(
51                kind_value,
52                IntegrationErrorKind::Transient
53                    | IntegrationErrorKind::RateLimited
54                    | IntegrationErrorKind::Timeout
55                    | IntegrationErrorKind::Unavailable
56            ),
57        }
58    }
59
60    pub fn with_code(mut self, code: impl Into<String>) -> Self {
61        self.code = Some(code.into());
62        self
63    }
64}
65
66impl fmt::Display for IntegrationError {
67    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
68        if let Some(code) = &self.code {
69            write!(f, "[{}:{}] {}", self.source, code, self.message)
70        } else {
71            write!(f, "[{}] {}", self.source, self.message)
72        }
73    }
74}
75
76impl std::error::Error for IntegrationError {}
77
78pub type IntegrationResult<T> = Result<T, IntegrationError>;
79
80#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
81pub struct RetryPolicy {
82    pub max_attempts: u32,
83    pub initial_backoff_ms: u64,
84    pub max_backoff_ms: u64,
85}
86
87impl RetryPolicy {
88    pub fn conservative() -> Self {
89        Self {
90            max_attempts: 3,
91            initial_backoff_ms: 50,
92            max_backoff_ms: 500,
93        }
94    }
95
96    pub fn never() -> Self {
97        Self {
98            max_attempts: 1,
99            initial_backoff_ms: 0,
100            max_backoff_ms: 0,
101        }
102    }
103}
104
105impl Default for RetryPolicy {
106    fn default() -> Self {
107        Self::conservative()
108    }
109}
110
111pub fn run_with_retry<T, F>(policy: RetryPolicy, mut operation: F) -> IntegrationResult<T>
112where
113    F: FnMut(u32) -> IntegrationResult<T>,
114{
115    let attempts = policy.max_attempts.max(1);
116    let mut backoff_ms = policy.initial_backoff_ms;
117
118    for attempt in 1..=attempts {
119        match operation(attempt) {
120            Ok(value) => return Ok(value),
121            Err(err) if !err.retryable || attempt == attempts => return Err(err),
122            Err(_) => {
123                if backoff_ms > 0 {
124                    thread::sleep(Duration::from_millis(backoff_ms));
125                }
126                backoff_ms = (backoff_ms.saturating_mul(2)).min(policy.max_backoff_ms.max(1));
127            }
128        }
129    }
130
131    Err(IntegrationError::new(
132        "retry",
133        IntegrationErrorKind::Transient,
134        "retry exhausted",
135    ))
136}
137
138pub trait ConnectionLifecycleHook: Send + Sync {
139    fn on_connect(&self) -> IntegrationResult<()> {
140        Ok(())
141    }
142
143    fn on_disconnect(&self) -> IntegrationResult<()> {
144        Ok(())
145    }
146}
147
148pub trait ConnectionLifecycle {
149    fn register_hook(&mut self, hook: Arc<dyn ConnectionLifecycleHook>);
150    fn connect(&self) -> IntegrationResult<()>;
151    fn disconnect(&self) -> IntegrationResult<()>;
152}
153
154#[derive(Default)]
155pub struct LifecycleHooks {
156    hooks: Vec<Arc<dyn ConnectionLifecycleHook>>,
157}
158
159impl LifecycleHooks {
160    pub fn new() -> Self {
161        Self::default()
162    }
163}
164
165impl ConnectionLifecycle for LifecycleHooks {
166    fn register_hook(&mut self, hook: Arc<dyn ConnectionLifecycleHook>) {
167        self.hooks.push(hook);
168    }
169
170    fn connect(&self) -> IntegrationResult<()> {
171        for hook in &self.hooks {
172            hook.on_connect()?;
173        }
174        Ok(())
175    }
176
177    fn disconnect(&self) -> IntegrationResult<()> {
178        for hook in &self.hooks {
179            hook.on_disconnect()?;
180        }
181        Ok(())
182    }
183}
184
185#[derive(Debug, Clone, PartialEq, Serialize, Deserialize, Default)]
186pub struct QueryContext {
187    pub tenant_id: Option<String>,
188    pub trace_id: Option<String>,
189    pub tags: BTreeMap<String, String>,
190}
191
192pub trait TypedQueryBoundary {
193    type Request: Send + Sync;
194    type Response: Send + Sync;
195
196    fn execute(
197        &self,
198        request: &Self::Request,
199        context: &QueryContext,
200    ) -> IntegrationResult<Self::Response>;
201}
202
203#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
204pub struct SqlCommand {
205    pub statement: String,
206    pub params: Vec<Value>,
207}
208
209impl SqlCommand {
210    pub fn new(statement: impl Into<String>, params: Vec<Value>) -> Self {
211        Self {
212            statement: statement.into(),
213            params,
214        }
215    }
216}
217
218pub trait SingleStoreAdapter:
219    TypedQueryBoundary<Request = SqlCommand, Response = Vec<Row>>
220{
221    fn run_query(&self, query: SqlCommand, context: &QueryContext) -> IntegrationResult<Vec<Row>> {
222        let started_at = Instant::now();
223        let statement = query.statement.as_str();
224        let param_count = query.params.len();
225        let result = self.execute(&query, context);
226        match &result {
227            Ok(rows) => info!(
228                target: "shelly.integration.query",
229                source = "singlestore",
230                operation = "run_query",
231                tenant_id = ?context.tenant_id,
232                trace_id = ?context.trace_id,
233                tag_count = context.tags.len(),
234                statement,
235                param_count,
236                row_count = rows.len(),
237                duration_ms = started_at.elapsed().as_millis() as u64,
238                "Shelly integration query executed"
239            ),
240            Err(err) => warn!(
241                target: "shelly.integration.query",
242                source = "singlestore",
243                operation = "run_query",
244                tenant_id = ?context.tenant_id,
245                trace_id = ?context.trace_id,
246                tag_count = context.tags.len(),
247                statement,
248                param_count,
249                duration_ms = started_at.elapsed().as_millis() as u64,
250                error = %err,
251                "Shelly integration query failed"
252            ),
253        }
254        result
255    }
256}
257
258#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
259pub struct SearchRequest {
260    pub index: String,
261    pub text: String,
262    pub filters: Vec<crate::query::Filter>,
263    pub page: usize,
264    pub per_page: usize,
265}
266
267impl SearchRequest {
268    pub fn new(index: impl Into<String>, text: impl Into<String>) -> Self {
269        Self {
270            index: index.into(),
271            text: text.into(),
272            filters: Vec::new(),
273            page: 1,
274            per_page: 25,
275        }
276    }
277
278    pub fn with_filter(mut self, filter: crate::query::Filter) -> Self {
279        self.filters.push(filter);
280        self
281    }
282}
283
284#[derive(Debug, Clone, PartialEq, Serialize, Deserialize, Default)]
285pub struct SearchResponse {
286    pub total_hits: usize,
287    pub rows: Vec<StoredRow>,
288}
289
290pub trait OpenSearchAdapter:
291    TypedQueryBoundary<Request = SearchRequest, Response = SearchResponse>
292{
293    fn search(
294        &self,
295        request: SearchRequest,
296        context: &QueryContext,
297    ) -> IntegrationResult<SearchResponse> {
298        let started_at = Instant::now();
299        let index = request.index.as_str();
300        let text = request.text.as_str();
301        let filter_count = request.filters.len();
302        let page = request.page;
303        let per_page = request.per_page;
304        let result = self.execute(&request, context);
305        match &result {
306            Ok(response) => info!(
307                target: "shelly.integration.query",
308                source = "opensearch",
309                operation = "search",
310                tenant_id = ?context.tenant_id,
311                trace_id = ?context.trace_id,
312                tag_count = context.tags.len(),
313                index,
314                text,
315                filter_count,
316                page,
317                per_page,
318                row_count = response.rows.len(),
319                total_hits = response.total_hits,
320                duration_ms = started_at.elapsed().as_millis() as u64,
321                "Shelly integration query executed"
322            ),
323            Err(err) => warn!(
324                target: "shelly.integration.query",
325                source = "opensearch",
326                operation = "search",
327                tenant_id = ?context.tenant_id,
328                trace_id = ?context.trace_id,
329                tag_count = context.tags.len(),
330                index,
331                text,
332                filter_count,
333                page,
334                per_page,
335                duration_ms = started_at.elapsed().as_millis() as u64,
336                error = %err,
337                "Shelly integration query failed"
338            ),
339        }
340        result
341    }
342}
343
344pub trait AnalyticsSink: Send + Sync {
345    fn send_event(&self, event: AnalyticsEvent) -> IntegrationResult<()>;
346}
347
348#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
349pub struct AnalyticsEvent {
350    pub namespace: String,
351    pub name: String,
352    pub payload: Value,
353}
354
355impl AnalyticsEvent {
356    pub fn new(namespace: impl Into<String>, name: impl Into<String>, payload: Value) -> Self {
357        Self {
358            namespace: namespace.into(),
359            name: name.into(),
360            payload,
361        }
362    }
363}
364
365#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
366pub struct JobRequest {
367    pub workflow: String,
368    pub payload: Value,
369    pub idempotency_key: String,
370    pub metadata: BTreeMap<String, String>,
371}
372
373impl JobRequest {
374    pub fn new(
375        workflow: impl Into<String>,
376        payload: Value,
377        idempotency_key: impl Into<String>,
378    ) -> Self {
379        Self {
380            workflow: workflow.into(),
381            payload,
382            idempotency_key: idempotency_key.into(),
383            metadata: BTreeMap::new(),
384        }
385    }
386}
387
388#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
389#[serde(rename_all = "snake_case")]
390pub enum JobState {
391    Queued,
392    Running,
393    Succeeded,
394    Failed,
395    Canceled,
396}
397
398#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
399pub struct JobStatus {
400    pub id: String,
401    pub state: JobState,
402    pub attempts: u32,
403    pub result: Option<Value>,
404    pub error: Option<IntegrationError>,
405}
406
407#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
408pub struct JobHandle {
409    pub id: String,
410    pub workflow: String,
411    pub idempotency_key: String,
412}
413
414pub type JobCompletionCallback = Arc<dyn Fn(&JobStatus) + Send + Sync>;
415
416pub trait JobOrchestrator: Send + Sync {
417    fn enqueue(&self, request: JobRequest) -> IntegrationResult<JobHandle>;
418    fn status(&self, id: &str) -> IntegrationResult<JobStatus>;
419    fn poll(&self, id: &str, attempts: u32, backoff_ms: u64) -> IntegrationResult<JobStatus>;
420    fn register_completion_callback(&self, callback: JobCompletionCallback);
421}
422
423#[derive(Debug, Clone, Default)]
424pub struct InMemorySingleStoreAdapter {
425    rows: Vec<Row>,
426}
427
428impl InMemorySingleStoreAdapter {
429    pub fn new(rows: Vec<Row>) -> Self {
430        Self { rows }
431    }
432}
433
434impl TypedQueryBoundary for InMemorySingleStoreAdapter {
435    type Request = SqlCommand;
436    type Response = Vec<Row>;
437
438    fn execute(
439        &self,
440        request: &Self::Request,
441        _context: &QueryContext,
442    ) -> IntegrationResult<Self::Response> {
443        if request.statement.trim().is_empty() {
444            return Err(IntegrationError::new(
445                "singlestore",
446                IntegrationErrorKind::InvalidInput,
447                "empty SQL statement",
448            )
449            .with_code("empty_statement"));
450        }
451        Ok(self.rows.clone())
452    }
453}
454
455impl SingleStoreAdapter for InMemorySingleStoreAdapter {}
456
457#[derive(Debug, Clone, Default)]
458pub struct InMemoryOpenSearchAdapter {
459    rows: Vec<StoredRow>,
460}
461
462impl InMemoryOpenSearchAdapter {
463    pub fn new(rows: Vec<StoredRow>) -> Self {
464        Self { rows }
465    }
466}
467
468impl TypedQueryBoundary for InMemoryOpenSearchAdapter {
469    type Request = SearchRequest;
470    type Response = SearchResponse;
471
472    fn execute(
473        &self,
474        request: &Self::Request,
475        _context: &QueryContext,
476    ) -> IntegrationResult<Self::Response> {
477        if request.index.trim().is_empty() {
478            return Err(IntegrationError::new(
479                "opensearch",
480                IntegrationErrorKind::InvalidInput,
481                "search index must not be empty",
482            )
483            .with_code("empty_index"));
484        }
485        let needle = request.text.trim().to_lowercase();
486        let mut filtered = self
487            .rows
488            .iter()
489            .filter(|row| {
490                if needle.is_empty() {
491                    return true;
492                }
493                row.data.values().any(|value| {
494                    value
495                        .as_str()
496                        .map(|text| text.to_lowercase().contains(&needle))
497                        .unwrap_or(false)
498                })
499            })
500            .cloned()
501            .collect::<Vec<_>>();
502        let total_hits = filtered.len();
503        let page = request.page.max(1);
504        let per_page = request.per_page.max(1);
505        let offset = (page - 1) * per_page;
506        filtered = filtered.into_iter().skip(offset).take(per_page).collect();
507        Ok(SearchResponse {
508            total_hits,
509            rows: filtered,
510        })
511    }
512}
513
514impl OpenSearchAdapter for InMemoryOpenSearchAdapter {}
515
516#[derive(Debug, Default, Clone)]
517pub struct InMemoryAxiomSink {
518    events: Arc<Mutex<Vec<AnalyticsEvent>>>,
519}
520
521impl InMemoryAxiomSink {
522    pub fn events(&self) -> Vec<AnalyticsEvent> {
523        self.events
524            .lock()
525            .map(|events| events.clone())
526            .unwrap_or_default()
527    }
528}
529
530impl AnalyticsSink for InMemoryAxiomSink {
531    fn send_event(&self, event: AnalyticsEvent) -> IntegrationResult<()> {
532        let started_at = Instant::now();
533        let namespace = event.namespace.clone();
534        let name = event.name.clone();
535        self.events
536            .lock()
537            .map_err(|_| {
538                IntegrationError::new(
539                    "axiom",
540                    IntegrationErrorKind::Unavailable,
541                    "analytics sink lock poisoned",
542                )
543            })?
544            .push(event);
545        info!(
546            target: "shelly.integration.query",
547            source = "axiom",
548            operation = "send_event",
549            namespace,
550            event_name = name,
551            duration_ms = started_at.elapsed().as_millis() as u64,
552            "Shelly integration call executed"
553        );
554        Ok(())
555    }
556}
557
558#[derive(Default)]
559pub struct InMemoryJobOrchestrator {
560    statuses: Mutex<HashMap<String, JobStatus>>,
561    callbacks: Mutex<Vec<JobCompletionCallback>>,
562    next_id: Mutex<u64>,
563}
564
565impl InMemoryJobOrchestrator {
566    pub fn mark_succeeded(&self, id: &str, result: Value) -> IntegrationResult<()> {
567        let status = self.with_status_mut(id, |status| {
568            status.state = JobState::Succeeded;
569            status.result = Some(result);
570            status.error = None;
571        })?;
572        self.notify(&status);
573        Ok(())
574    }
575
576    pub fn mark_failed(&self, id: &str, error: IntegrationError) -> IntegrationResult<()> {
577        let status = self.with_status_mut(id, |status| {
578            status.state = JobState::Failed;
579            status.result = None;
580            status.error = Some(error);
581        })?;
582        self.notify(&status);
583        Ok(())
584    }
585
586    fn with_status_mut<F>(&self, id: &str, update: F) -> IntegrationResult<JobStatus>
587    where
588        F: FnOnce(&mut JobStatus),
589    {
590        let mut statuses = self.statuses.lock().map_err(|_| {
591            IntegrationError::new(
592                "trigger",
593                IntegrationErrorKind::Unavailable,
594                "job status lock poisoned",
595            )
596        })?;
597        let Some(status) = statuses.get_mut(id) else {
598            return Err(IntegrationError::new(
599                "trigger",
600                IntegrationErrorKind::InvalidInput,
601                "job not found",
602            )
603            .with_code("job_not_found"));
604        };
605        status.attempts = status.attempts.saturating_add(1);
606        update(status);
607        Ok(status.clone())
608    }
609
610    fn notify(&self, status: &JobStatus) {
611        if let Ok(callbacks) = self.callbacks.lock() {
612            for callback in callbacks.iter() {
613                callback(status);
614            }
615        }
616    }
617
618    fn is_terminal(state: JobState) -> bool {
619        matches!(
620            state,
621            JobState::Succeeded | JobState::Failed | JobState::Canceled
622        )
623    }
624}
625
626impl JobOrchestrator for InMemoryJobOrchestrator {
627    fn enqueue(&self, request: JobRequest) -> IntegrationResult<JobHandle> {
628        let started_at = Instant::now();
629        let workflow = request.workflow.clone();
630        let idempotency_key = request.idempotency_key.clone();
631        let mut next_id = self.next_id.lock().map_err(|_| {
632            IntegrationError::new(
633                "trigger",
634                IntegrationErrorKind::Unavailable,
635                "job id lock poisoned",
636            )
637        })?;
638        *next_id = next_id.saturating_add(1);
639        let id = format!("job-{next_id}");
640        let status = JobStatus {
641            id: id.clone(),
642            state: JobState::Queued,
643            attempts: 0,
644            result: None,
645            error: None,
646        };
647        self.statuses
648            .lock()
649            .map_err(|_| {
650                IntegrationError::new(
651                    "trigger",
652                    IntegrationErrorKind::Unavailable,
653                    "job status lock poisoned",
654                )
655            })?
656            .insert(id.clone(), status);
657        let handle = JobHandle {
658            id,
659            workflow: request.workflow,
660            idempotency_key: request.idempotency_key,
661        };
662        info!(
663            target: "shelly.integration.query",
664            source = "trigger",
665            operation = "enqueue",
666            workflow,
667            idempotency_key,
668            job_id = handle.id.as_str(),
669            duration_ms = started_at.elapsed().as_millis() as u64,
670            "Shelly integration call executed"
671        );
672        Ok(handle)
673    }
674
675    fn status(&self, id: &str) -> IntegrationResult<JobStatus> {
676        let started_at = Instant::now();
677        let result = self
678            .statuses
679            .lock()
680            .map_err(|_| {
681                IntegrationError::new(
682                    "trigger",
683                    IntegrationErrorKind::Unavailable,
684                    "job status lock poisoned",
685                )
686            })?
687            .get(id)
688            .cloned()
689            .ok_or_else(|| {
690                IntegrationError::new(
691                    "trigger",
692                    IntegrationErrorKind::InvalidInput,
693                    "job not found",
694                )
695                .with_code("job_not_found")
696            });
697        match &result {
698            Ok(status) => info!(
699                target: "shelly.integration.query",
700                source = "trigger",
701                operation = "status",
702                job_id = id,
703                job_state = ?status.state,
704                attempts = status.attempts,
705                duration_ms = started_at.elapsed().as_millis() as u64,
706                "Shelly integration call executed"
707            ),
708            Err(err) => warn!(
709                target: "shelly.integration.query",
710                source = "trigger",
711                operation = "status",
712                job_id = id,
713                duration_ms = started_at.elapsed().as_millis() as u64,
714                error = %err,
715                "Shelly integration call failed"
716            ),
717        }
718        result
719    }
720
721    fn poll(&self, id: &str, attempts: u32, backoff_ms: u64) -> IntegrationResult<JobStatus> {
722        let started_at = Instant::now();
723        let attempts = attempts.max(1);
724        let mut polled = 0u32;
725        for current in 1..=attempts {
726            let status = self.status(id)?;
727            polled = current;
728            if Self::is_terminal(status.state) || current == attempts {
729                info!(
730                    target: "shelly.integration.query",
731                    source = "trigger",
732                    operation = "poll",
733                    job_id = id,
734                    attempts = polled,
735                    backoff_ms,
736                    terminal = Self::is_terminal(status.state),
737                    job_state = ?status.state,
738                    duration_ms = started_at.elapsed().as_millis() as u64,
739                    "Shelly integration call executed"
740                );
741                return Ok(status);
742            }
743            if backoff_ms > 0 {
744                thread::sleep(Duration::from_millis(backoff_ms));
745            }
746        }
747
748        let result = self.status(id);
749        match &result {
750            Ok(status) => info!(
751                target: "shelly.integration.query",
752                source = "trigger",
753                operation = "poll",
754                job_id = id,
755                attempts = polled,
756                backoff_ms,
757                terminal = Self::is_terminal(status.state),
758                job_state = ?status.state,
759                duration_ms = started_at.elapsed().as_millis() as u64,
760                "Shelly integration call executed"
761            ),
762            Err(err) => warn!(
763                target: "shelly.integration.query",
764                source = "trigger",
765                operation = "poll",
766                job_id = id,
767                attempts = polled,
768                backoff_ms,
769                duration_ms = started_at.elapsed().as_millis() as u64,
770                error = %err,
771                "Shelly integration call failed"
772            ),
773        }
774        result
775    }
776
777    fn register_completion_callback(&self, callback: JobCompletionCallback) {
778        if let Ok(mut callbacks) = self.callbacks.lock() {
779            callbacks.push(callback);
780            info!(
781                target: "shelly.integration.query",
782                source = "trigger",
783                operation = "register_completion_callback",
784                callback_count = callbacks.len(),
785                "Shelly integration callback registered"
786            );
787        } else {
788            warn!(
789                target: "shelly.integration.query",
790                source = "trigger",
791                operation = "register_completion_callback",
792                "Shelly integration callback registration failed"
793            );
794        }
795    }
796}
797
798pub fn map_integration_error(source: impl Into<String>, err: IntegrationError) -> DataError {
799    DataError::Integration(format!("[{}] {}", source.into(), err))
800}
801
802pub fn map_integration_result<T>(
803    source: impl Into<String>,
804    result: IntegrationResult<T>,
805) -> DataResult<T> {
806    result.map_err(|err| map_integration_error(source, err))
807}
808
809pub fn query_from_search(request: &SearchRequest) -> Query {
810    let mut query = Query::new().paginate(request.page, request.per_page);
811    for filter in &request.filters {
812        query = query.where_filter(filter.clone());
813    }
814    query
815}
816
817#[cfg(test)]
818mod tests {
819    use super::{
820        map_integration_result, run_with_retry, AnalyticsEvent, AnalyticsSink, ConnectionLifecycle,
821        ConnectionLifecycleHook, InMemoryAxiomSink, InMemoryJobOrchestrator,
822        InMemoryOpenSearchAdapter, InMemorySingleStoreAdapter, IntegrationError,
823        IntegrationErrorKind, JobHandle, JobOrchestrator, JobRequest, JobState, JobStatus,
824        LifecycleHooks, OpenSearchAdapter, QueryContext, RetryPolicy, SearchRequest,
825        SingleStoreAdapter, SqlCommand,
826    };
827    use serde_json::{json, Value};
828    use std::sync::{Arc, Mutex};
829
830    struct CountingHook {
831        connects: Arc<Mutex<u32>>,
832        disconnects: Arc<Mutex<u32>>,
833    }
834
835    impl ConnectionLifecycleHook for CountingHook {
836        fn on_connect(&self) -> super::IntegrationResult<()> {
837            let mut guard = self.connects.lock().unwrap();
838            *guard += 1;
839            Ok(())
840        }
841
842        fn on_disconnect(&self) -> super::IntegrationResult<()> {
843            let mut guard = self.disconnects.lock().unwrap();
844            *guard += 1;
845            Ok(())
846        }
847    }
848
849    #[test]
850    fn lifecycle_hooks_are_called() {
851        let connects = Arc::new(Mutex::new(0));
852        let disconnects = Arc::new(Mutex::new(0));
853        let mut lifecycle = LifecycleHooks::new();
854        lifecycle.register_hook(Arc::new(CountingHook {
855            connects: connects.clone(),
856            disconnects: disconnects.clone(),
857        }));
858
859        lifecycle.connect().unwrap();
860        lifecycle.disconnect().unwrap();
861        assert_eq!(*connects.lock().unwrap(), 1);
862        assert_eq!(*disconnects.lock().unwrap(), 1);
863    }
864
865    #[test]
866    fn retry_policy_retries_transient_errors() {
867        let mut calls = 0u32;
868        let result = run_with_retry(RetryPolicy::conservative(), |attempt| {
869            calls = attempt;
870            if attempt < 3 {
871                Err(IntegrationError::new(
872                    "opensearch",
873                    IntegrationErrorKind::Transient,
874                    "temporary failure",
875                ))
876            } else {
877                Ok("ok")
878            }
879        })
880        .unwrap();
881
882        assert_eq!(result, "ok");
883        assert_eq!(calls, 3);
884    }
885
886    #[test]
887    fn retry_policy_stops_on_permanent_errors() {
888        let mut calls = 0u32;
889        let err = run_with_retry(RetryPolicy::conservative(), |attempt| {
890            calls = attempt;
891            Err::<(), IntegrationError>(IntegrationError::new(
892                "singlestore",
893                IntegrationErrorKind::Permanent,
894                "invalid sql",
895            ))
896        })
897        .unwrap_err();
898
899        assert_eq!(calls, 1);
900        assert_eq!(err.kind, IntegrationErrorKind::Permanent);
901    }
902
903    #[test]
904    fn integration_result_maps_into_data_error() {
905        let mapped = map_integration_result::<()>(
906            "trigger",
907            Err(IntegrationError::new(
908                "trigger",
909                IntegrationErrorKind::Unavailable,
910                "service unavailable",
911            )),
912        )
913        .unwrap_err();
914
915        assert!(mapped.to_string().contains("service unavailable"));
916    }
917
918    #[derive(Default)]
919    struct InMemoryJobs {
920        statuses: Arc<Mutex<Vec<JobStatus>>>,
921        callbacks: Arc<Mutex<Vec<super::JobCompletionCallback>>>,
922    }
923
924    impl JobOrchestrator for InMemoryJobs {
925        fn enqueue(&self, request: JobRequest) -> super::IntegrationResult<JobHandle> {
926            let id = format!("job-{}", request.idempotency_key);
927            self.statuses.lock().unwrap().push(JobStatus {
928                id: id.clone(),
929                state: JobState::Queued,
930                attempts: 0,
931                result: None,
932                error: None,
933            });
934            Ok(JobHandle {
935                id,
936                workflow: request.workflow,
937                idempotency_key: request.idempotency_key,
938            })
939        }
940
941        fn status(&self, id: &str) -> super::IntegrationResult<JobStatus> {
942            self.statuses
943                .lock()
944                .unwrap()
945                .iter()
946                .find(|status| status.id == id)
947                .cloned()
948                .ok_or_else(|| {
949                    IntegrationError::new(
950                        "trigger",
951                        IntegrationErrorKind::InvalidInput,
952                        "job not found",
953                    )
954                })
955        }
956
957        fn poll(
958            &self,
959            id: &str,
960            _attempts: u32,
961            _backoff_ms: u64,
962        ) -> super::IntegrationResult<JobStatus> {
963            self.status(id)
964        }
965
966        fn register_completion_callback(&self, callback: super::JobCompletionCallback) {
967            self.callbacks.lock().unwrap().push(callback);
968        }
969    }
970
971    #[test]
972    fn job_orchestration_contract_supports_enqueue_and_status() {
973        let jobs = InMemoryJobs::default();
974        let handle = jobs
975            .enqueue(JobRequest::new(
976                "sync_customer",
977                json!({"id": 42}),
978                "idempotent-42",
979            ))
980            .unwrap();
981        let status = jobs.status(&handle.id).unwrap();
982        assert_eq!(status.state, JobState::Queued);
983        assert_eq!(handle.idempotency_key, "idempotent-42");
984
985        let ctx = QueryContext {
986            tenant_id: Some("tenant-a".to_string()),
987            ..QueryContext::default()
988        };
989        assert_eq!(ctx.tenant_id.as_deref(), Some("tenant-a"));
990    }
991
992    #[test]
993    fn reference_singlestore_adapter_runs_typed_sql_boundary() {
994        let adapter = InMemorySingleStoreAdapter::new(vec![std::collections::BTreeMap::from([(
995            "region".to_string(),
996            Value::String("EMEA".to_string()),
997        )])]);
998        let rows = adapter
999            .run_query(
1000                SqlCommand::new("SELECT region FROM accounts", Vec::new()),
1001                &QueryContext::default(),
1002            )
1003            .unwrap();
1004        assert_eq!(rows.len(), 1);
1005        assert_eq!(
1006            rows[0].get("region"),
1007            Some(&Value::String("EMEA".to_string()))
1008        );
1009    }
1010
1011    #[test]
1012    fn reference_opensearch_adapter_filters_rows() {
1013        let rows = vec![
1014            crate::StoredRow {
1015                id: 1,
1016                data: std::collections::BTreeMap::from([(
1017                    "title".to_string(),
1018                    Value::String("Acme renewal".to_string()),
1019                )]),
1020            },
1021            crate::StoredRow {
1022                id: 2,
1023                data: std::collections::BTreeMap::from([(
1024                    "title".to_string(),
1025                    Value::String("Globex onboarding".to_string()),
1026                )]),
1027            },
1028        ];
1029        let adapter = InMemoryOpenSearchAdapter::new(rows);
1030        let response = adapter
1031            .search(
1032                SearchRequest::new("accounts", "renewal"),
1033                &QueryContext::default(),
1034            )
1035            .unwrap();
1036        assert_eq!(response.total_hits, 1);
1037        assert_eq!(response.rows[0].id, 1);
1038    }
1039
1040    #[test]
1041    fn reference_axiom_sink_records_events() {
1042        let sink = InMemoryAxiomSink::default();
1043        sink.send_event(AnalyticsEvent::new(
1044            "sales",
1045            "query_executed",
1046            json!({"latency_ms": 12}),
1047        ))
1048        .unwrap();
1049        let events = sink.events();
1050        assert_eq!(events.len(), 1);
1051        assert_eq!(events[0].name, "query_executed");
1052    }
1053
1054    #[test]
1055    fn reference_trigger_orchestrator_supports_completion_and_polling() {
1056        let orchestrator = InMemoryJobOrchestrator::default();
1057        let completed = Arc::new(Mutex::new(false));
1058        let completed_flag = completed.clone();
1059        orchestrator.register_completion_callback(Arc::new(move |status| {
1060            if status.state == JobState::Succeeded {
1061                if let Ok(mut guard) = completed_flag.lock() {
1062                    *guard = true;
1063                }
1064            }
1065        }));
1066
1067        let handle = orchestrator
1068            .enqueue(JobRequest::new(
1069                "refresh_dashboard",
1070                json!({"account_id": 7}),
1071                "refresh-7",
1072            ))
1073            .unwrap();
1074        orchestrator
1075            .mark_succeeded(&handle.id, json!({"rows_synced": 18}))
1076            .unwrap();
1077        let status = orchestrator.poll(&handle.id, 2, 0).unwrap();
1078        assert_eq!(status.state, JobState::Succeeded);
1079        assert_eq!(
1080            status.result,
1081            Some(json!({
1082                "rows_synced": 18
1083            }))
1084        );
1085        assert!(*completed.lock().unwrap());
1086    }
1087}