1use alembic_core::{JsonMap, Key, Schema, TypeName, Uid};
4use alembic_engine::{
5 apply_non_delete_with_retries, Adapter, AdapterApplyError, AppliedOp, ApplyReport, BackendId,
6 ObservedObject, ObservedState, Op, RetryApplyDriver,
7};
8use anyhow::{anyhow, Result};
9use async_trait::async_trait;
10use serde::{Deserialize, Serialize};
11use std::collections::{BTreeMap, BTreeSet};
12
13#[derive(Debug, Clone, Serialize, Deserialize)]
15pub struct GenericConfig {
16 pub base_url: String,
18 #[serde(default)]
20 pub headers: BTreeMap<String, String>,
21 pub types: BTreeMap<String, EndpointConfig>,
23}
24
25#[derive(Debug, Clone, Serialize, Deserialize)]
27pub struct EndpointConfig {
28 pub path: String,
30 pub results_path: Option<String>,
32 #[serde(default = "default_id_path")]
34 pub id_path: String,
35 #[serde(default)]
37 pub delete_strategy: DeleteStrategy,
38 #[serde(default = "default_update_method")]
40 pub update_method: String,
41}
42
43fn default_id_path() -> String {
44 "id".to_string()
45}
46
47fn default_update_method() -> String {
48 "PATCH".to_string()
49}
50
51#[derive(Debug, Clone, Default, Serialize, Deserialize)]
53#[serde(rename_all = "snake_case")]
54pub enum DeleteStrategy {
55 #[default]
57 None,
58 Standard,
60}
61
62pub struct GenericAdapter {
63 config: GenericConfig,
64 client: reqwest::Client,
65}
66
67impl GenericAdapter {
68 pub fn new(config: GenericConfig) -> Result<Self> {
69 let mut headers = reqwest::header::HeaderMap::new();
70 for (k, v) in &config.headers {
71 let name = reqwest::header::HeaderName::from_bytes(k.as_bytes())?;
72 let value = reqwest::header::HeaderValue::from_str(v)?;
73 headers.insert(name, value);
74 }
75
76 let client = reqwest::Client::builder()
77 .default_headers(headers)
78 .build()?;
79
80 for (type_name, endpoint) in &config.types {
81 match endpoint.update_method.as_str() {
82 "PATCH" | "PUT" => {}
83 other => {
84 return Err(anyhow!(
85 "invalid update_method {:?} for type {} (expected PATCH or PUT)",
86 other,
87 type_name
88 ));
89 }
90 }
91 }
92
93 Ok(Self { config, client })
94 }
95
96 async fn apply_create(
97 &self,
98 uid: Uid,
99 type_name: &TypeName,
100 desired: &alembic_core::Object,
101 schema: &Schema,
102 resolved: &mut BTreeMap<Uid, BackendId>,
103 ) -> Result<AppliedOp> {
104 let endpoint = self
105 .config
106 .types
107 .get(type_name.as_str())
108 .ok_or_else(|| anyhow!("no config for {}", type_name))?;
109 let type_schema = schema
110 .types
111 .get(type_name.as_str())
112 .ok_or_else(|| anyhow!("missing schema for {}", type_name))?;
113
114 let url = format!(
115 "{}/{}",
116 self.config.base_url.trim_end_matches('/'),
117 endpoint.path.trim_start_matches('/')
118 );
119 let body = resolve_attrs(&desired.attrs, type_schema, resolved)?;
120
121 let resp = self
122 .client
123 .post(&url)
124 .json(&body)
125 .send()
126 .await?
127 .error_for_status()?;
128 let body: serde_json::Value = resp.json().await?;
129
130 let id_val = resolve_path(&body, &endpoint.id_path)?;
131 let backend_id = match id_val {
132 serde_json::Value::Number(n) => {
133 BackendId::Int(n.as_u64().ok_or_else(|| anyhow!("invalid integer id"))?)
134 }
135 serde_json::Value::String(s) => BackendId::String(s),
136 _ => return Err(anyhow!("id must be number or string")),
137 };
138 resolved.insert(uid, backend_id.clone());
139
140 Ok(AppliedOp {
141 uid,
142 type_name: type_name.clone(),
143 backend_id: Some(backend_id),
144 })
145 }
146
147 async fn apply_update(
148 &self,
149 uid: Uid,
150 type_name: &TypeName,
151 desired: &alembic_core::Object,
152 backend_id: Option<&BackendId>,
153 schema: &Schema,
154 resolved: &BTreeMap<Uid, BackendId>,
155 ) -> Result<AppliedOp> {
156 let endpoint = self
157 .config
158 .types
159 .get(type_name.as_str())
160 .ok_or_else(|| anyhow!("no config for {}", type_name))?;
161 let type_schema = schema
162 .types
163 .get(type_name.as_str())
164 .ok_or_else(|| anyhow!("missing schema for {}", type_name))?;
165
166 let id = backend_id.ok_or_else(|| anyhow!("update requires backend id"))?;
167 let url = self.backend_id_to_url(endpoint, id);
168 let body = resolve_attrs(&desired.attrs, type_schema, resolved)?;
169
170 let req = match endpoint.update_method.as_str() {
171 "PUT" => self.client.put(&url),
172 _ => self.client.patch(&url),
173 };
174
175 req.json(&body).send().await?.error_for_status()?;
176
177 Ok(AppliedOp {
178 uid,
179 type_name: type_name.clone(),
180 backend_id: Some(id.clone()),
181 })
182 }
183
184 async fn apply_delete(&self, type_name: &TypeName, id: &BackendId) -> Result<()> {
185 let endpoint = self
186 .config
187 .types
188 .get(type_name.as_str())
189 .ok_or_else(|| anyhow!("no config for {}", type_name))?;
190
191 match endpoint.delete_strategy {
192 DeleteStrategy::Standard => {
193 let url = self.backend_id_to_url(endpoint, id);
194 self.client.delete(&url).send().await?.error_for_status()?;
195 }
196 DeleteStrategy::None => {
197 return Err(anyhow!(
198 "delete not supported for type {} (delete_strategy: none)",
199 type_name
200 ));
201 }
202 }
203 Ok(())
204 }
205
206 fn backend_id_to_url(&self, endpoint: &EndpointConfig, id: &BackendId) -> String {
207 let id_str = match id {
208 BackendId::Int(n) => n.to_string(),
209 BackendId::String(s) => s.clone(),
210 };
211 format!(
212 "{}/{}/{}",
213 self.config.base_url.trim_end_matches('/'),
214 endpoint.path.trim_matches('/'),
215 id_str
216 )
217 }
218}
219
220#[async_trait]
221impl Adapter for GenericAdapter {
222 async fn read(
223 &self,
224 schema: &Schema,
225 types: &[TypeName],
226 state_store: &alembic_engine::StateStore,
227 ) -> Result<ObservedState> {
228 let mut state = ObservedState::default();
229 let mappings = state_mappings(state_store);
230 let requested: BTreeSet<TypeName> = if types.is_empty() {
231 self.config
232 .types
233 .keys()
234 .map(|s| TypeName::new(s.clone()))
235 .collect()
236 } else {
237 types.iter().cloned().collect()
238 };
239
240 let mut tasks = Vec::new();
241 for type_name in requested {
242 let endpoint = self
243 .config
244 .types
245 .get(type_name.as_str())
246 .ok_or_else(|| anyhow!("no generic config for type {}", type_name))?
247 .clone();
248 let type_schema = schema
249 .types
250 .get(type_name.as_str())
251 .ok_or_else(|| anyhow!("missing schema for {}", type_name))?
252 .clone();
253
254 let client = self.client.clone();
255 let base_url = self.config.base_url.clone();
256 let mappings = mappings.clone();
257
258 tasks.push(tokio::spawn(async move {
259 let url = format!(
260 "{}/{}",
261 base_url.trim_end_matches('/'),
262 endpoint.path.trim_start_matches('/')
263 );
264 let resp = client.get(&url).send().await?.error_for_status()?;
265 let body: serde_json::Value = resp.json().await?;
266
267 let results = if let Some(path) = &endpoint.results_path {
268 let val = resolve_path(&body, path)?;
269 val.as_array()
270 .ok_or_else(|| {
271 anyhow!("expected array at path {} for {}", path, type_name)
272 })?
273 .clone()
274 } else if let Some(arr) = body.as_array() {
275 arr.clone()
276 } else {
277 return Err(anyhow!("expected array in list response for {}", type_name));
278 };
279
280 let mut observed = Vec::new();
281 for item in results {
282 let id_val = resolve_path(&item, &endpoint.id_path)?;
283 let backend_id = match id_val {
284 serde_json::Value::Number(n) => {
285 BackendId::Int(n.as_u64().ok_or_else(|| anyhow!("invalid integer id"))?)
286 }
287 serde_json::Value::String(s) => BackendId::String(s),
288 _ => return Err(anyhow!("id must be number or string")),
289 };
290
291 let attrs = match item {
292 serde_json::Value::Object(map) => {
293 map.into_iter().collect::<BTreeMap<_, _>>().into()
294 }
295 _ => return Err(anyhow!("expected object in results")),
296 };
297
298 let attrs = normalize_attrs_refs(&attrs, &type_schema, &mappings);
299 let key = build_key_from_schema(&type_schema, &attrs)?;
300
301 observed.push(ObservedObject {
302 type_name: type_name.clone(),
303 key,
304 attrs,
305 backend_id: Some(backend_id),
306 });
307 }
308 Ok::<Vec<ObservedObject>, anyhow::Error>(observed)
309 }));
310 }
311
312 let results = futures::future::join_all(tasks).await;
313 for result in results {
314 let objects = result??;
315 for object in objects {
316 state.insert(object);
317 }
318 }
319
320 Ok(state)
321 }
322
323 async fn write(
324 &self,
325 schema: &Schema,
326 ops: &[Op],
327 state: &alembic_engine::StateStore,
328 ) -> Result<ApplyReport> {
329 let mut applied = Vec::new();
330 let mut resolved = resolved_from_state(state);
331
332 let mut creates_updates = Vec::new();
333 let mut deletes = Vec::new();
334 for op in ops {
335 match op {
336 Op::Delete { .. } => deletes.push(op.clone()),
337 _ => creates_updates.push(op.clone()),
338 }
339 }
340
341 struct ApplyDriver<'a> {
342 adapter: &'a GenericAdapter,
343 resolved: &'a mut BTreeMap<Uid, BackendId>,
344 schema: &'a Schema,
345 }
346
347 #[async_trait]
348 impl RetryApplyDriver for ApplyDriver<'_> {
349 async fn apply_non_delete(&mut self, op: &Op) -> Result<AppliedOp> {
350 match op {
351 Op::Create {
352 uid,
353 type_name,
354 desired,
355 } => {
356 self.adapter
357 .apply_create(*uid, type_name, desired, self.schema, self.resolved)
358 .await
359 }
360 Op::Update {
361 uid,
362 type_name,
363 desired,
364 backend_id,
365 ..
366 } => {
367 self.adapter
368 .apply_update(
369 *uid,
370 type_name,
371 desired,
372 backend_id.as_ref(),
373 self.schema,
374 self.resolved,
375 )
376 .await
377 }
378 Op::Delete { .. } => unreachable!("delete ops filtered before retry"),
379 }
380 }
381
382 fn is_retryable(&self, err: &anyhow::Error) -> bool {
383 is_missing_ref_error(err)
384 }
385 }
386
387 let mut driver = ApplyDriver {
388 adapter: self,
389 resolved: &mut resolved,
390 schema,
391 };
392 let retry_result = apply_non_delete_with_retries(&creates_updates, &mut driver).await?;
393 if !retry_result.pending.is_empty() {
394 let missing = describe_missing_refs(&retry_result.pending, &resolved);
395 return Err(anyhow!("unresolved references: {missing}"));
396 }
397
398 for applied_op in retry_result.applied {
399 if let Some(backend_id) = &applied_op.backend_id {
400 resolved.insert(applied_op.uid, backend_id.clone());
401 }
402 applied.push(applied_op);
403 }
404
405 for op in deletes {
406 if let Op::Delete {
407 uid,
408 type_name,
409 backend_id,
410 ..
411 } = op
412 {
413 let id = backend_id.ok_or_else(|| anyhow!("delete requires backend id"))?;
414 self.apply_delete(&type_name, &id).await?;
415 applied.push(AppliedOp {
416 uid,
417 type_name,
418 backend_id: None,
419 });
420 }
421 }
422
423 Ok(ApplyReport {
424 applied,
425 ..Default::default()
426 })
427 }
428}
429
430fn resolve_path(value: &serde_json::Value, path: &str) -> Result<serde_json::Value> {
431 let mut current = value;
432 for segment in path.split('.') {
433 if segment.is_empty() {
434 continue;
435 }
436 current = current
437 .get(segment)
438 .ok_or_else(|| anyhow!("path segment not found: {}", segment))?;
439 }
440 Ok(current.clone())
441}
442
443#[derive(Debug, Default, Clone)]
444struct StateMappings {
445 by_type: BTreeMap<String, BTreeMap<BackendId, Uid>>,
446}
447
448impl StateMappings {
449 fn uid_for(&self, type_name: &str, backend_id: &BackendId) -> Option<Uid> {
450 self.by_type
451 .get(type_name)
452 .and_then(|mapping| mapping.get(backend_id).copied())
453 }
454}
455
456fn state_mappings(state: &alembic_engine::StateStore) -> StateMappings {
457 let mut by_type = BTreeMap::new();
458 for (type_name, mapping) in state.all_mappings() {
459 let mut id_to_uid = BTreeMap::new();
460 for (uid, backend_id) in mapping {
461 id_to_uid.insert(backend_id.clone(), *uid);
462 }
463 by_type.insert(type_name.as_str().to_string(), id_to_uid);
464 }
465 StateMappings { by_type }
466}
467
468fn build_key_from_schema(type_schema: &alembic_core::TypeSchema, attrs: &JsonMap) -> Result<Key> {
469 let mut map = BTreeMap::new();
470 for field in type_schema.key.keys() {
471 let Some(value) = attrs.get(field) else {
472 return Err(anyhow!("missing key field {field}"));
473 };
474 map.insert(field.clone(), value.clone());
475 }
476 Ok(Key::from(map))
477}
478
479fn resolved_from_state(state: &alembic_engine::StateStore) -> BTreeMap<Uid, BackendId> {
480 let mut resolved = BTreeMap::new();
481 for mapping in state.all_mappings().values() {
482 for (uid, backend_id) in mapping {
483 resolved.insert(*uid, backend_id.clone());
484 }
485 }
486 resolved
487}
488
489fn normalize_attrs_refs(
490 attrs: &JsonMap,
491 type_schema: &alembic_core::TypeSchema,
492 mappings: &StateMappings,
493) -> JsonMap {
494 let mut normalized = attrs.clone();
495 for (field, schema) in &type_schema.fields {
496 match &schema.r#type {
497 alembic_core::FieldType::Ref { target } => {
498 if let Some(value) = attrs.get(field) {
499 normalized.insert(
500 field.clone(),
501 normalize_ref_value(value.clone(), target, mappings),
502 );
503 }
504 }
505 alembic_core::FieldType::ListRef { target } => {
506 if let Some(value) = attrs.get(field) {
507 let updated = if let serde_json::Value::Array(items) = value {
508 let mapped = items
509 .iter()
510 .cloned()
511 .map(|item| normalize_ref_value(item, target, mappings))
512 .collect::<Vec<_>>();
513 serde_json::Value::Array(mapped)
514 } else {
515 value.clone()
516 };
517 normalized.insert(field.clone(), updated);
518 }
519 }
520 _ => {}
521 }
522 }
523 normalized
524}
525
526fn normalize_ref_value(
527 value: serde_json::Value,
528 target: &str,
529 mappings: &StateMappings,
530) -> serde_json::Value {
531 if value.is_null() {
532 return value;
533 }
534 let backend_id = match backend_id_from_value(&value) {
535 Some(id) => id,
536 None => return value,
537 };
538 mappings
539 .uid_for(target, &backend_id)
540 .map(|uid| serde_json::Value::String(uid.to_string()))
541 .unwrap_or(value)
542}
543
544fn backend_id_from_value(value: &serde_json::Value) -> Option<BackendId> {
545 match value {
546 serde_json::Value::Number(n) => n.as_u64().map(BackendId::Int).or_else(|| {
547 n.as_i64()
548 .and_then(|v| u64::try_from(v).ok())
549 .map(BackendId::Int)
550 }),
551 serde_json::Value::String(s) => Some(BackendId::String(s.clone())),
552 serde_json::Value::Object(map) => map.get("id").and_then(backend_id_from_value),
553 _ => None,
554 }
555}
556
557fn resolve_attrs(
558 attrs: &JsonMap,
559 type_schema: &alembic_core::TypeSchema,
560 resolved: &BTreeMap<Uid, BackendId>,
561) -> Result<serde_json::Value> {
562 let mut map = serde_json::Map::new();
563 for (key, value) in attrs.iter() {
564 let field_schema = type_schema
565 .fields
566 .get(key)
567 .ok_or_else(|| anyhow!("missing schema for field {key}"))?;
568 map.insert(
569 key.clone(),
570 resolve_value_for_type(&field_schema.r#type, value.clone(), resolved)?,
571 );
572 }
573 Ok(serde_json::Value::Object(map))
574}
575
576fn resolve_value_for_type(
577 field_type: &alembic_core::FieldType,
578 value: serde_json::Value,
579 resolved: &BTreeMap<Uid, BackendId>,
580) -> Result<serde_json::Value> {
581 match field_type {
582 alembic_core::FieldType::Ref { .. } => resolve_ref_value(value, resolved),
583 alembic_core::FieldType::ListRef { .. } => {
584 let serde_json::Value::Array(items) = value else {
585 return Err(anyhow!("expected array for list_ref"));
586 };
587 let mut out = Vec::new();
588 for item in items {
589 out.push(resolve_ref_value(item, resolved)?);
590 }
591 Ok(serde_json::Value::Array(out))
592 }
593 _ => Ok(value),
594 }
595}
596
597fn resolve_ref_value(
598 value: serde_json::Value,
599 resolved: &BTreeMap<Uid, BackendId>,
600) -> Result<serde_json::Value> {
601 let serde_json::Value::String(raw) = value else {
602 return Err(anyhow!("ref must be uuid string"));
603 };
604 let uid = Uid::parse_str(&raw).map_err(|_| anyhow!("invalid uuid: {}", raw))?;
605 let id = resolved
606 .get(&uid)
607 .ok_or(AdapterApplyError::MissingRef { uid })?;
608 Ok(match id {
609 BackendId::Int(n) => serde_json::Value::Number((*n).into()),
610 BackendId::String(s) => serde_json::Value::String(s.clone()),
611 })
612}
613
614fn is_missing_ref_error(err: &anyhow::Error) -> bool {
615 err.downcast_ref::<AdapterApplyError>()
616 .is_some_and(|e| matches!(e, AdapterApplyError::MissingRef { .. }))
617}
618
619fn describe_missing_refs(ops: &[Op], resolved: &BTreeMap<Uid, BackendId>) -> String {
620 let mut missing = BTreeSet::new();
621 for op in ops {
622 if let Op::Create { desired, .. } | Op::Update { desired, .. } = op {
623 for value in desired.attrs.values() {
624 collect_missing_refs(value, resolved, &mut missing);
625 }
626 }
627 }
628 missing
629 .into_iter()
630 .map(|uid| uid.to_string())
631 .collect::<Vec<_>>()
632 .join(", ")
633}
634
635fn collect_missing_refs(
636 value: &serde_json::Value,
637 resolved: &BTreeMap<Uid, BackendId>,
638 missing: &mut BTreeSet<Uid>,
639) {
640 match value {
641 serde_json::Value::String(raw) => {
642 if let Ok(uid) = Uid::parse_str(raw) {
643 if !resolved.contains_key(&uid) {
644 missing.insert(uid);
645 }
646 }
647 }
648 serde_json::Value::Array(items) => {
649 for item in items {
650 collect_missing_refs(item, resolved, missing);
651 }
652 }
653 serde_json::Value::Object(map) => {
654 for value in map.values() {
655 collect_missing_refs(value, resolved, missing);
656 }
657 }
658 _ => {}
659 }
660}
661
662#[cfg(test)]
663mod tests;