Skip to main content

alembic_adapter_generic/
lib.rs

1//! generic rest adapter for alembic.
2
3use alembic_core::{JsonMap, Key, Schema, TypeName, Uid};
4use alembic_engine::{
5    apply_non_delete_with_retries, Adapter, AdapterApplyError, AppliedOp, ApplyReport, BackendId,
6    ObservedObject, ObservedState, Op, RetryApplyDriver,
7};
8use anyhow::{anyhow, Result};
9use async_trait::async_trait;
10use serde::{Deserialize, Serialize};
11use std::collections::{BTreeMap, BTreeSet};
12
13/// configuration for the generic rest adapter.
14#[derive(Debug, Clone, Serialize, Deserialize)]
15pub struct GenericConfig {
16    /// base url for the api.
17    pub base_url: String,
18    /// authentication headers.
19    #[serde(default)]
20    pub headers: BTreeMap<String, String>,
21    /// type-to-endpoint mappings.
22    pub types: BTreeMap<String, EndpointConfig>,
23}
24
25/// endpoint configuration for a specific type.
26#[derive(Debug, Clone, Serialize, Deserialize)]
27pub struct EndpointConfig {
28    /// path for listing and creating objects.
29    pub path: String,
30    /// json path to the results array in the list response (default: root).
31    pub results_path: Option<String>,
32    /// json path to the object id (default: "id").
33    #[serde(default = "default_id_path")]
34    pub id_path: String,
35    /// strategy for deletions.
36    #[serde(default)]
37    pub delete_strategy: DeleteStrategy,
38    /// method for updates (default: PATCH).
39    #[serde(default = "default_update_method")]
40    pub update_method: String,
41}
42
43fn default_id_path() -> String {
44    "id".to_string()
45}
46
47fn default_update_method() -> String {
48    "PATCH".to_string()
49}
50
51/// strategy for deleting objects.
52#[derive(Debug, Clone, Default, Serialize, Deserialize)]
53#[serde(rename_all = "snake_case")]
54pub enum DeleteStrategy {
55    /// deletes are not supported for this type.
56    #[default]
57    None,
58    /// delete via DELETE method to path + id.
59    Standard,
60}
61
62pub struct GenericAdapter {
63    config: GenericConfig,
64    client: reqwest::Client,
65}
66
67impl GenericAdapter {
68    pub fn new(config: GenericConfig) -> Result<Self> {
69        let mut headers = reqwest::header::HeaderMap::new();
70        for (k, v) in &config.headers {
71            let name = reqwest::header::HeaderName::from_bytes(k.as_bytes())?;
72            let value = reqwest::header::HeaderValue::from_str(v)?;
73            headers.insert(name, value);
74        }
75
76        let client = reqwest::Client::builder()
77            .default_headers(headers)
78            .build()?;
79
80        for (type_name, endpoint) in &config.types {
81            match endpoint.update_method.as_str() {
82                "PATCH" | "PUT" => {}
83                other => {
84                    return Err(anyhow!(
85                        "invalid update_method {:?} for type {} (expected PATCH or PUT)",
86                        other,
87                        type_name
88                    ));
89                }
90            }
91        }
92
93        Ok(Self { config, client })
94    }
95
96    async fn apply_create(
97        &self,
98        uid: Uid,
99        type_name: &TypeName,
100        desired: &alembic_core::Object,
101        schema: &Schema,
102        resolved: &mut BTreeMap<Uid, BackendId>,
103    ) -> Result<AppliedOp> {
104        let endpoint = self
105            .config
106            .types
107            .get(type_name.as_str())
108            .ok_or_else(|| anyhow!("no config for {}", type_name))?;
109        let type_schema = schema
110            .types
111            .get(type_name.as_str())
112            .ok_or_else(|| anyhow!("missing schema for {}", type_name))?;
113
114        let url = format!(
115            "{}/{}",
116            self.config.base_url.trim_end_matches('/'),
117            endpoint.path.trim_start_matches('/')
118        );
119        let body = resolve_attrs(&desired.attrs, type_schema, resolved)?;
120
121        let resp = self
122            .client
123            .post(&url)
124            .json(&body)
125            .send()
126            .await?
127            .error_for_status()?;
128        let body: serde_json::Value = resp.json().await?;
129
130        let id_val = resolve_path(&body, &endpoint.id_path)?;
131        let backend_id = match id_val {
132            serde_json::Value::Number(n) => {
133                BackendId::Int(n.as_u64().ok_or_else(|| anyhow!("invalid integer id"))?)
134            }
135            serde_json::Value::String(s) => BackendId::String(s),
136            _ => return Err(anyhow!("id must be number or string")),
137        };
138        resolved.insert(uid, backend_id.clone());
139
140        Ok(AppliedOp {
141            uid,
142            type_name: type_name.clone(),
143            backend_id: Some(backend_id),
144        })
145    }
146
147    async fn apply_update(
148        &self,
149        uid: Uid,
150        type_name: &TypeName,
151        desired: &alembic_core::Object,
152        backend_id: Option<&BackendId>,
153        schema: &Schema,
154        resolved: &BTreeMap<Uid, BackendId>,
155    ) -> Result<AppliedOp> {
156        let endpoint = self
157            .config
158            .types
159            .get(type_name.as_str())
160            .ok_or_else(|| anyhow!("no config for {}", type_name))?;
161        let type_schema = schema
162            .types
163            .get(type_name.as_str())
164            .ok_or_else(|| anyhow!("missing schema for {}", type_name))?;
165
166        let id = backend_id.ok_or_else(|| anyhow!("update requires backend id"))?;
167        let url = self.backend_id_to_url(endpoint, id);
168        let body = resolve_attrs(&desired.attrs, type_schema, resolved)?;
169
170        let req = match endpoint.update_method.as_str() {
171            "PUT" => self.client.put(&url),
172            _ => self.client.patch(&url),
173        };
174
175        req.json(&body).send().await?.error_for_status()?;
176
177        Ok(AppliedOp {
178            uid,
179            type_name: type_name.clone(),
180            backend_id: Some(id.clone()),
181        })
182    }
183
184    async fn apply_delete(&self, type_name: &TypeName, id: &BackendId) -> Result<()> {
185        let endpoint = self
186            .config
187            .types
188            .get(type_name.as_str())
189            .ok_or_else(|| anyhow!("no config for {}", type_name))?;
190
191        match endpoint.delete_strategy {
192            DeleteStrategy::Standard => {
193                let url = self.backend_id_to_url(endpoint, id);
194                self.client.delete(&url).send().await?.error_for_status()?;
195            }
196            DeleteStrategy::None => {
197                return Err(anyhow!(
198                    "delete not supported for type {} (delete_strategy: none)",
199                    type_name
200                ));
201            }
202        }
203        Ok(())
204    }
205
206    fn backend_id_to_url(&self, endpoint: &EndpointConfig, id: &BackendId) -> String {
207        let id_str = match id {
208            BackendId::Int(n) => n.to_string(),
209            BackendId::String(s) => s.clone(),
210        };
211        format!(
212            "{}/{}/{}",
213            self.config.base_url.trim_end_matches('/'),
214            endpoint.path.trim_matches('/'),
215            id_str
216        )
217    }
218}
219
220#[async_trait]
221impl Adapter for GenericAdapter {
222    async fn read(
223        &self,
224        schema: &Schema,
225        types: &[TypeName],
226        state_store: &alembic_engine::StateStore,
227    ) -> Result<ObservedState> {
228        let mut state = ObservedState::default();
229        let mappings = state_mappings(state_store);
230        let requested: BTreeSet<TypeName> = if types.is_empty() {
231            self.config
232                .types
233                .keys()
234                .map(|s| TypeName::new(s.clone()))
235                .collect()
236        } else {
237            types.iter().cloned().collect()
238        };
239
240        let mut tasks = Vec::new();
241        for type_name in requested {
242            let endpoint = self
243                .config
244                .types
245                .get(type_name.as_str())
246                .ok_or_else(|| anyhow!("no generic config for type {}", type_name))?
247                .clone();
248            let type_schema = schema
249                .types
250                .get(type_name.as_str())
251                .ok_or_else(|| anyhow!("missing schema for {}", type_name))?
252                .clone();
253
254            let client = self.client.clone();
255            let base_url = self.config.base_url.clone();
256            let mappings = mappings.clone();
257
258            tasks.push(tokio::spawn(async move {
259                let url = format!(
260                    "{}/{}",
261                    base_url.trim_end_matches('/'),
262                    endpoint.path.trim_start_matches('/')
263                );
264                let resp = client.get(&url).send().await?.error_for_status()?;
265                let body: serde_json::Value = resp.json().await?;
266
267                let results = if let Some(path) = &endpoint.results_path {
268                    let val = resolve_path(&body, path)?;
269                    val.as_array()
270                        .ok_or_else(|| {
271                            anyhow!("expected array at path {} for {}", path, type_name)
272                        })?
273                        .clone()
274                } else if let Some(arr) = body.as_array() {
275                    arr.clone()
276                } else {
277                    return Err(anyhow!("expected array in list response for {}", type_name));
278                };
279
280                let mut observed = Vec::new();
281                for item in results {
282                    let id_val = resolve_path(&item, &endpoint.id_path)?;
283                    let backend_id = match id_val {
284                        serde_json::Value::Number(n) => {
285                            BackendId::Int(n.as_u64().ok_or_else(|| anyhow!("invalid integer id"))?)
286                        }
287                        serde_json::Value::String(s) => BackendId::String(s),
288                        _ => return Err(anyhow!("id must be number or string")),
289                    };
290
291                    let attrs = match item {
292                        serde_json::Value::Object(map) => {
293                            map.into_iter().collect::<BTreeMap<_, _>>().into()
294                        }
295                        _ => return Err(anyhow!("expected object in results")),
296                    };
297
298                    let attrs = normalize_attrs_refs(&attrs, &type_schema, &mappings);
299                    let key = build_key_from_schema(&type_schema, &attrs)?;
300
301                    observed.push(ObservedObject {
302                        type_name: type_name.clone(),
303                        key,
304                        attrs,
305                        backend_id: Some(backend_id),
306                    });
307                }
308                Ok::<Vec<ObservedObject>, anyhow::Error>(observed)
309            }));
310        }
311
312        let results = futures::future::join_all(tasks).await;
313        for result in results {
314            let objects = result??;
315            for object in objects {
316                state.insert(object);
317            }
318        }
319
320        Ok(state)
321    }
322
323    async fn write(
324        &self,
325        schema: &Schema,
326        ops: &[Op],
327        state: &alembic_engine::StateStore,
328    ) -> Result<ApplyReport> {
329        let mut applied = Vec::new();
330        let mut resolved = resolved_from_state(state);
331
332        let mut creates_updates = Vec::new();
333        let mut deletes = Vec::new();
334        for op in ops {
335            match op {
336                Op::Delete { .. } => deletes.push(op.clone()),
337                _ => creates_updates.push(op.clone()),
338            }
339        }
340
341        struct ApplyDriver<'a> {
342            adapter: &'a GenericAdapter,
343            resolved: &'a mut BTreeMap<Uid, BackendId>,
344            schema: &'a Schema,
345        }
346
347        #[async_trait]
348        impl RetryApplyDriver for ApplyDriver<'_> {
349            async fn apply_non_delete(&mut self, op: &Op) -> Result<AppliedOp> {
350                match op {
351                    Op::Create {
352                        uid,
353                        type_name,
354                        desired,
355                    } => {
356                        self.adapter
357                            .apply_create(*uid, type_name, desired, self.schema, self.resolved)
358                            .await
359                    }
360                    Op::Update {
361                        uid,
362                        type_name,
363                        desired,
364                        backend_id,
365                        ..
366                    } => {
367                        self.adapter
368                            .apply_update(
369                                *uid,
370                                type_name,
371                                desired,
372                                backend_id.as_ref(),
373                                self.schema,
374                                self.resolved,
375                            )
376                            .await
377                    }
378                    Op::Delete { .. } => unreachable!("delete ops filtered before retry"),
379                }
380            }
381
382            fn is_retryable(&self, err: &anyhow::Error) -> bool {
383                is_missing_ref_error(err)
384            }
385        }
386
387        let mut driver = ApplyDriver {
388            adapter: self,
389            resolved: &mut resolved,
390            schema,
391        };
392        let retry_result = apply_non_delete_with_retries(&creates_updates, &mut driver).await?;
393        if !retry_result.pending.is_empty() {
394            let missing = describe_missing_refs(&retry_result.pending, &resolved);
395            return Err(anyhow!("unresolved references: {missing}"));
396        }
397
398        for applied_op in retry_result.applied {
399            if let Some(backend_id) = &applied_op.backend_id {
400                resolved.insert(applied_op.uid, backend_id.clone());
401            }
402            applied.push(applied_op);
403        }
404
405        for op in deletes {
406            if let Op::Delete {
407                uid,
408                type_name,
409                backend_id,
410                ..
411            } = op
412            {
413                let id = backend_id.ok_or_else(|| anyhow!("delete requires backend id"))?;
414                self.apply_delete(&type_name, &id).await?;
415                applied.push(AppliedOp {
416                    uid,
417                    type_name,
418                    backend_id: None,
419                });
420            }
421        }
422
423        Ok(ApplyReport {
424            applied,
425            ..Default::default()
426        })
427    }
428}
429
430fn resolve_path(value: &serde_json::Value, path: &str) -> Result<serde_json::Value> {
431    let mut current = value;
432    for segment in path.split('.') {
433        if segment.is_empty() {
434            continue;
435        }
436        current = current
437            .get(segment)
438            .ok_or_else(|| anyhow!("path segment not found: {}", segment))?;
439    }
440    Ok(current.clone())
441}
442
443#[derive(Debug, Default, Clone)]
444struct StateMappings {
445    by_type: BTreeMap<String, BTreeMap<BackendId, Uid>>,
446}
447
448impl StateMappings {
449    fn uid_for(&self, type_name: &str, backend_id: &BackendId) -> Option<Uid> {
450        self.by_type
451            .get(type_name)
452            .and_then(|mapping| mapping.get(backend_id).copied())
453    }
454}
455
456fn state_mappings(state: &alembic_engine::StateStore) -> StateMappings {
457    let mut by_type = BTreeMap::new();
458    for (type_name, mapping) in state.all_mappings() {
459        let mut id_to_uid = BTreeMap::new();
460        for (uid, backend_id) in mapping {
461            id_to_uid.insert(backend_id.clone(), *uid);
462        }
463        by_type.insert(type_name.as_str().to_string(), id_to_uid);
464    }
465    StateMappings { by_type }
466}
467
468fn build_key_from_schema(type_schema: &alembic_core::TypeSchema, attrs: &JsonMap) -> Result<Key> {
469    let mut map = BTreeMap::new();
470    for field in type_schema.key.keys() {
471        let Some(value) = attrs.get(field) else {
472            return Err(anyhow!("missing key field {field}"));
473        };
474        map.insert(field.clone(), value.clone());
475    }
476    Ok(Key::from(map))
477}
478
479fn resolved_from_state(state: &alembic_engine::StateStore) -> BTreeMap<Uid, BackendId> {
480    let mut resolved = BTreeMap::new();
481    for mapping in state.all_mappings().values() {
482        for (uid, backend_id) in mapping {
483            resolved.insert(*uid, backend_id.clone());
484        }
485    }
486    resolved
487}
488
489fn normalize_attrs_refs(
490    attrs: &JsonMap,
491    type_schema: &alembic_core::TypeSchema,
492    mappings: &StateMappings,
493) -> JsonMap {
494    let mut normalized = attrs.clone();
495    for (field, schema) in &type_schema.fields {
496        match &schema.r#type {
497            alembic_core::FieldType::Ref { target } => {
498                if let Some(value) = attrs.get(field) {
499                    normalized.insert(
500                        field.clone(),
501                        normalize_ref_value(value.clone(), target, mappings),
502                    );
503                }
504            }
505            alembic_core::FieldType::ListRef { target } => {
506                if let Some(value) = attrs.get(field) {
507                    let updated = if let serde_json::Value::Array(items) = value {
508                        let mapped = items
509                            .iter()
510                            .cloned()
511                            .map(|item| normalize_ref_value(item, target, mappings))
512                            .collect::<Vec<_>>();
513                        serde_json::Value::Array(mapped)
514                    } else {
515                        value.clone()
516                    };
517                    normalized.insert(field.clone(), updated);
518                }
519            }
520            _ => {}
521        }
522    }
523    normalized
524}
525
526fn normalize_ref_value(
527    value: serde_json::Value,
528    target: &str,
529    mappings: &StateMappings,
530) -> serde_json::Value {
531    if value.is_null() {
532        return value;
533    }
534    let backend_id = match backend_id_from_value(&value) {
535        Some(id) => id,
536        None => return value,
537    };
538    mappings
539        .uid_for(target, &backend_id)
540        .map(|uid| serde_json::Value::String(uid.to_string()))
541        .unwrap_or(value)
542}
543
544fn backend_id_from_value(value: &serde_json::Value) -> Option<BackendId> {
545    match value {
546        serde_json::Value::Number(n) => n.as_u64().map(BackendId::Int).or_else(|| {
547            n.as_i64()
548                .and_then(|v| u64::try_from(v).ok())
549                .map(BackendId::Int)
550        }),
551        serde_json::Value::String(s) => Some(BackendId::String(s.clone())),
552        serde_json::Value::Object(map) => map.get("id").and_then(backend_id_from_value),
553        _ => None,
554    }
555}
556
557fn resolve_attrs(
558    attrs: &JsonMap,
559    type_schema: &alembic_core::TypeSchema,
560    resolved: &BTreeMap<Uid, BackendId>,
561) -> Result<serde_json::Value> {
562    let mut map = serde_json::Map::new();
563    for (key, value) in attrs.iter() {
564        let field_schema = type_schema
565            .fields
566            .get(key)
567            .ok_or_else(|| anyhow!("missing schema for field {key}"))?;
568        map.insert(
569            key.clone(),
570            resolve_value_for_type(&field_schema.r#type, value.clone(), resolved)?,
571        );
572    }
573    Ok(serde_json::Value::Object(map))
574}
575
576fn resolve_value_for_type(
577    field_type: &alembic_core::FieldType,
578    value: serde_json::Value,
579    resolved: &BTreeMap<Uid, BackendId>,
580) -> Result<serde_json::Value> {
581    match field_type {
582        alembic_core::FieldType::Ref { .. } => resolve_ref_value(value, resolved),
583        alembic_core::FieldType::ListRef { .. } => {
584            let serde_json::Value::Array(items) = value else {
585                return Err(anyhow!("expected array for list_ref"));
586            };
587            let mut out = Vec::new();
588            for item in items {
589                out.push(resolve_ref_value(item, resolved)?);
590            }
591            Ok(serde_json::Value::Array(out))
592        }
593        _ => Ok(value),
594    }
595}
596
597fn resolve_ref_value(
598    value: serde_json::Value,
599    resolved: &BTreeMap<Uid, BackendId>,
600) -> Result<serde_json::Value> {
601    let serde_json::Value::String(raw) = value else {
602        return Err(anyhow!("ref must be uuid string"));
603    };
604    let uid = Uid::parse_str(&raw).map_err(|_| anyhow!("invalid uuid: {}", raw))?;
605    let id = resolved
606        .get(&uid)
607        .ok_or(AdapterApplyError::MissingRef { uid })?;
608    Ok(match id {
609        BackendId::Int(n) => serde_json::Value::Number((*n).into()),
610        BackendId::String(s) => serde_json::Value::String(s.clone()),
611    })
612}
613
614fn is_missing_ref_error(err: &anyhow::Error) -> bool {
615    err.downcast_ref::<AdapterApplyError>()
616        .is_some_and(|e| matches!(e, AdapterApplyError::MissingRef { .. }))
617}
618
619fn describe_missing_refs(ops: &[Op], resolved: &BTreeMap<Uid, BackendId>) -> String {
620    let mut missing = BTreeSet::new();
621    for op in ops {
622        if let Op::Create { desired, .. } | Op::Update { desired, .. } = op {
623            for value in desired.attrs.values() {
624                collect_missing_refs(value, resolved, &mut missing);
625            }
626        }
627    }
628    missing
629        .into_iter()
630        .map(|uid| uid.to_string())
631        .collect::<Vec<_>>()
632        .join(", ")
633}
634
635fn collect_missing_refs(
636    value: &serde_json::Value,
637    resolved: &BTreeMap<Uid, BackendId>,
638    missing: &mut BTreeSet<Uid>,
639) {
640    match value {
641        serde_json::Value::String(raw) => {
642            if let Ok(uid) = Uid::parse_str(raw) {
643                if !resolved.contains_key(&uid) {
644                    missing.insert(uid);
645                }
646            }
647        }
648        serde_json::Value::Array(items) => {
649            for item in items {
650                collect_missing_refs(item, resolved, missing);
651            }
652        }
653        serde_json::Value::Object(map) => {
654            for value in map.values() {
655                collect_missing_refs(value, resolved, missing);
656            }
657        }
658        _ => {}
659    }
660}
661
662#[cfg(test)]
663mod tests;