Skip to main content

alembic_adapter_registry/
lib.rs

1//! adapter registry and config loading for alembic.
2
3use alembic_engine::{
4    Adapter, ApplyReport, BackendId, ObservedObject, ObservedState, Op, ProvisionReport, StateData,
5    StateStore,
6};
7use anyhow::{anyhow, Context, Result};
8use serde::{de::DeserializeOwned, Deserialize, Serialize};
9use serde_json::Value as JsonValue;
10use std::collections::BTreeMap;
11use std::fs;
12use std::path::PathBuf;
13use std::process::Stdio;
14use std::time::Duration;
15use tokio::io::AsyncWriteExt;
16use tokio::process::Command;
17use tokio::time::timeout;
18
19const SUPPORTED_BACKENDS: &[&str] = &[
20    "netbox",
21    "nautobot",
22    "infrahub",
23    "generic",
24    "peeringdb",
25    "external",
26];
27
28#[derive(Debug, Deserialize)]
29#[serde(tag = "backend", rename_all = "kebab-case")]
30pub enum AdapterConfig {
31    Netbox(NetboxConfig),
32    Nautobot(NautobotConfig),
33    Infrahub(InfrahubConfig),
34    Generic(GenericConfig),
35    Peeringdb,
36    External(ExternalConfig),
37}
38
39#[derive(Debug, Deserialize)]
40pub struct NetboxConfig {
41    pub url: Option<String>,
42    pub token: Option<String>,
43}
44
45#[derive(Debug, Deserialize)]
46pub struct NautobotConfig {
47    pub url: Option<String>,
48    pub token: Option<String>,
49}
50
51#[derive(Debug, Deserialize)]
52pub struct InfrahubConfig {
53    pub url: Option<String>,
54    pub token: Option<String>,
55    pub branch: Option<String>,
56    #[serde(default)]
57    pub schema: Option<InfrahubSchemaConfig>,
58}
59
60#[derive(Debug, Deserialize)]
61pub struct GenericConfig {
62    pub config: Option<alembic_adapter_generic::GenericConfig>,
63    pub config_path: Option<PathBuf>,
64}
65
66#[derive(Debug, Deserialize)]
67pub struct ExternalConfig {
68    pub command: Option<String>,
69    #[serde(default)]
70    pub args: Vec<String>,
71    pub working_dir: Option<PathBuf>,
72    #[serde(default)]
73    pub env: BTreeMap<String, String>,
74    pub timeout_seconds: Option<u64>,
75}
76
77#[derive(Debug, Clone, Copy, PartialEq, Eq, Deserialize, Default)]
78#[serde(rename_all = "kebab-case")]
79pub enum InfrahubSchemaMode {
80    #[default]
81    None,
82    Infrahubctl,
83    Repository,
84}
85
86#[derive(Debug, Deserialize)]
87pub struct InfrahubSchemaConfig {
88    #[serde(default)]
89    pub mode: InfrahubSchemaMode,
90    pub schema_path: Option<PathBuf>,
91    pub repository_id: Option<String>,
92    pub repository_name: Option<String>,
93    pub repository_root: Option<PathBuf>,
94    pub branch: Option<String>,
95    pub infrahubctl_path: Option<PathBuf>,
96}
97
98impl AdapterConfig {
99    fn backend_name(&self) -> &'static str {
100        match self {
101            AdapterConfig::Netbox(_) => "netbox",
102            AdapterConfig::Nautobot(_) => "nautobot",
103            AdapterConfig::Infrahub(_) => "infrahub",
104            AdapterConfig::Generic(_) => "generic",
105            AdapterConfig::Peeringdb => "peeringdb",
106            AdapterConfig::External(_) => "external",
107        }
108    }
109
110    fn from_env(backend: &str) -> Result<Self> {
111        match backend.to_lowercase().as_str() {
112            "netbox" => Ok(AdapterConfig::Netbox(NetboxConfig {
113                url: None,
114                token: None,
115            })),
116            "nautobot" => Ok(AdapterConfig::Nautobot(NautobotConfig {
117                url: None,
118                token: None,
119            })),
120            "infrahub" => Ok(AdapterConfig::Infrahub(InfrahubConfig {
121                url: None,
122                token: None,
123                branch: None,
124                schema: None,
125            })),
126            "generic" => Ok(AdapterConfig::Generic(GenericConfig {
127                config: None,
128                config_path: None,
129            })),
130            "peeringdb" => Ok(AdapterConfig::Peeringdb),
131            "external" => Ok(AdapterConfig::External(ExternalConfig {
132                command: None,
133                args: Vec::new(),
134                working_dir: None,
135                env: BTreeMap::new(),
136                timeout_seconds: None,
137            })),
138            other => Err(anyhow!(
139                "unsupported backend {other} (expected one of: {})",
140                SUPPORTED_BACKENDS.join(", ")
141            )),
142        }
143    }
144
145    fn build(self) -> Result<Box<dyn Adapter>> {
146        match self {
147            AdapterConfig::Netbox(cfg) => {
148                let (url, token) = resolve_credentials("NETBOX", cfg.url, cfg.token)?;
149                Ok(Box::new(alembic_adapter_netbox::NetBoxAdapter::new(
150                    &url, &token,
151                )?))
152            }
153            AdapterConfig::Nautobot(cfg) => {
154                let (url, token) = resolve_credentials("NAUTOBOT", cfg.url, cfg.token)?;
155                Ok(Box::new(alembic_adapter_nautobot::NautobotAdapter::new(
156                    &url, &token,
157                )?))
158            }
159            AdapterConfig::Infrahub(cfg) => {
160                let (url, token) = resolve_credentials("INFRAHUB", cfg.url, cfg.token)?;
161                let mut adapter = alembic_adapter_infrahub::InfrahubAdapter::new(
162                    &url,
163                    &token,
164                    cfg.branch.as_deref(),
165                )?;
166                if let Some(schema_cfg) = cfg.schema {
167                    if let Some(schema_push) = schema_cfg.build()? {
168                        adapter = adapter.with_schema_push(schema_push);
169                    }
170                }
171                Ok(Box::new(adapter))
172            }
173            AdapterConfig::Generic(cfg) => {
174                if cfg.config.is_some() && cfg.config_path.is_some() {
175                    return Err(anyhow!(
176                        "generic adapter config cannot include both config and config_path"
177                    ));
178                }
179                let config = if let Some(config) = cfg.config {
180                    config
181                } else {
182                    let path = cfg
183                        .config_path
184                        .or_else(|| std::env::var("GENERIC_CONFIG").ok().map(PathBuf::from));
185                    let path =
186                        path.ok_or_else(|| anyhow!("generic backend requires config_path"))?;
187                    let content = fs::read_to_string(&path)
188                        .with_context(|| format!("read generic config: {}", path.display()))?;
189                    serde_yaml::from_str(&content)
190                        .with_context(|| format!("parse generic config: {}", path.display()))?
191                };
192                Ok(Box::new(alembic_adapter_generic::GenericAdapter::new(
193                    config,
194                )?))
195            }
196            AdapterConfig::Peeringdb => {
197                Ok(Box::new(alembic_adapter_peeringdb::PeeringDBAdapter::new()))
198            }
199            AdapterConfig::External(cfg) => Ok(Box::new(ProcessAdapter::new(cfg)?)),
200        }
201    }
202}
203
204#[derive(Debug, Clone)]
205struct ProcessAdapter {
206    command: String,
207    args: Vec<String>,
208    working_dir: Option<PathBuf>,
209    env: BTreeMap<String, String>,
210    timeout: Duration,
211}
212
213impl ProcessAdapter {
214    fn new(cfg: ExternalConfig) -> Result<Self> {
215        let command = cfg
216            .command
217            .or_else(|| std::env::var("EXTERNAL_COMMAND").ok())
218            .ok_or_else(|| anyhow!("external backend requires command"))?;
219        let timeout = Duration::from_secs(cfg.timeout_seconds.unwrap_or(120));
220        Ok(Self {
221            command,
222            args: cfg.args,
223            working_dir: cfg.working_dir,
224            env: cfg.env,
225            timeout,
226        })
227    }
228
229    async fn call<R: DeserializeOwned>(&self, request: ExternalRequest<'_>) -> Result<R> {
230        let envelope = ExternalEnvelope {
231            version: 1,
232            request,
233        };
234        let payload = serde_json::to_vec(&envelope).context("serialize external request")?;
235        let output = self.run(payload).await?;
236        let stdout =
237            String::from_utf8(output.stdout).context("external adapter response not utf-8")?;
238        let response: ExternalResponse<JsonValue> =
239            serde_json::from_str(&stdout).context("parse external adapter response")?;
240        if !response.ok {
241            let message = response
242                .error
243                .unwrap_or_else(|| "external adapter error".to_string());
244            return Err(anyhow!(message));
245        }
246        let result = response
247            .result
248            .ok_or_else(|| anyhow!("external adapter response missing result"))?;
249        serde_json::from_value(result).context("deserialize external adapter result")
250    }
251
252    async fn run(&self, payload: Vec<u8>) -> Result<std::process::Output> {
253        let mut cmd = Command::new(&self.command);
254        cmd.args(&self.args)
255            .stdin(Stdio::piped())
256            .stdout(Stdio::piped())
257            .stderr(Stdio::piped());
258        if let Some(dir) = &self.working_dir {
259            cmd.current_dir(dir);
260        }
261        for (key, value) in &self.env {
262            cmd.env(key, value);
263        }
264
265        let mut child = cmd.spawn().context("spawn external adapter")?;
266        if let Some(mut stdin) = child.stdin.take() {
267            stdin
268                .write_all(&payload)
269                .await
270                .context("write external adapter stdin")?;
271        }
272
273        let output = timeout(self.timeout, child.wait_with_output())
274            .await
275            .context("external adapter timed out")?
276            .context("wait for external adapter")?;
277
278        if !output.status.success() {
279            let stderr = String::from_utf8_lossy(&output.stderr).trim().to_string();
280            return Err(anyhow!(
281                "external adapter exited with {}: {}",
282                output.status,
283                stderr
284            ));
285        }
286
287        Ok(output)
288    }
289}
290
291#[async_trait::async_trait]
292impl Adapter for ProcessAdapter {
293    async fn read(
294        &self,
295        schema: &alembic_core::Schema,
296        types: &[alembic_core::TypeName],
297        state: &StateStore,
298    ) -> Result<ObservedState> {
299        let state = StateData {
300            mappings: state.all_mappings().clone(),
301        };
302        let objects: Vec<ObservedObjectData> = self
303            .call(ExternalRequest::Read {
304                schema,
305                types,
306                state,
307            })
308            .await?;
309        let mut observed = ObservedState::default();
310        for object in objects {
311            observed.insert(ObservedObject {
312                type_name: object.type_name,
313                key: object.key,
314                attrs: object.attrs,
315                backend_id: object.backend_id,
316            });
317        }
318        Ok(observed)
319    }
320
321    async fn write(
322        &self,
323        schema: &alembic_core::Schema,
324        ops: &[Op],
325        state: &StateStore,
326    ) -> Result<ApplyReport> {
327        let state = StateData {
328            mappings: state.all_mappings().clone(),
329        };
330        self.call(ExternalRequest::Write { schema, ops, state })
331            .await
332    }
333
334    async fn ensure_schema(&self, schema: &alembic_core::Schema) -> Result<ProvisionReport> {
335        self.call(ExternalRequest::EnsureSchema { schema }).await
336    }
337}
338
339#[derive(Debug, Serialize)]
340struct ExternalEnvelope<'a> {
341    version: u8,
342    #[serde(flatten)]
343    request: ExternalRequest<'a>,
344}
345
346#[derive(Debug, Serialize)]
347#[serde(tag = "method", rename_all = "snake_case")]
348enum ExternalRequest<'a> {
349    Read {
350        schema: &'a alembic_core::Schema,
351        types: &'a [alembic_core::TypeName],
352        state: StateData,
353    },
354    Write {
355        schema: &'a alembic_core::Schema,
356        ops: &'a [Op],
357        state: StateData,
358    },
359    EnsureSchema {
360        schema: &'a alembic_core::Schema,
361    },
362}
363
364#[derive(Debug, Deserialize)]
365struct ExternalResponse<T> {
366    ok: bool,
367    result: Option<T>,
368    error: Option<String>,
369}
370
371#[derive(Debug, Deserialize)]
372struct ObservedObjectData {
373    type_name: alembic_core::TypeName,
374    key: alembic_core::Key,
375    attrs: alembic_core::JsonMap,
376    backend_id: Option<BackendId>,
377}
378
379impl InfrahubSchemaConfig {
380    fn build(self) -> Result<Option<alembic_adapter_infrahub::SchemaPushConfig>> {
381        if self.mode == InfrahubSchemaMode::None {
382            return Ok(None);
383        }
384        let schema_path = self
385            .schema_path
386            .ok_or_else(|| anyhow!("infrahub schema requires schema_path"))?;
387        let mode = match self.mode {
388            InfrahubSchemaMode::Infrahubctl => {
389                alembic_adapter_infrahub::SchemaApplyMode::Infrahubctl
390            }
391            InfrahubSchemaMode::Repository => alembic_adapter_infrahub::SchemaApplyMode::Repository,
392            InfrahubSchemaMode::None => alembic_adapter_infrahub::SchemaApplyMode::Infrahubctl,
393        };
394        let config = alembic_adapter_infrahub::SchemaPushConfig {
395            schema_path,
396            mode,
397            repository_id: self.repository_id,
398            repository_name: self.repository_name,
399            repository_root: self.repository_root,
400            branch: self.branch,
401            infrahubctl_path: self.infrahubctl_path,
402        };
403
404        if config.mode == alembic_adapter_infrahub::SchemaApplyMode::Repository {
405            if config.repository_root.is_none() {
406                return Err(anyhow!(
407                    "infrahub schema repository mode requires repository_root"
408                ));
409            }
410            if config.repository_id.is_none() && config.repository_name.is_none() {
411                return Err(anyhow!(
412                    "infrahub schema repository mode requires repository_id or repository_name"
413                ));
414            }
415        }
416
417        Ok(Some(config))
418    }
419}
420
421pub fn create_adapter(
422    backend: Option<&str>,
423    config_path: Option<PathBuf>,
424) -> Result<Box<dyn Adapter>> {
425    let config = if let Some(path) = config_path {
426        let content = fs::read_to_string(&path)
427            .with_context(|| format!("read adapter config: {}", path.display()))?;
428        let config: AdapterConfig = serde_yaml::from_str(&content)
429            .with_context(|| format!("parse adapter config: {}", path.display()))?;
430        if let Some(backend) = backend {
431            if backend.to_lowercase() != config.backend_name() {
432                return Err(anyhow!(
433                    "backend {backend} does not match config backend {}",
434                    config.backend_name()
435                ));
436            }
437        }
438        config
439    } else {
440        let backend =
441            backend.ok_or_else(|| anyhow!("--backend or --backend-config is required"))?;
442        AdapterConfig::from_env(backend)?
443    };
444
445    config.build()
446}
447
448pub fn resolve_credentials(
449    prefix: &str,
450    url: Option<String>,
451    token: Option<String>,
452) -> Result<(String, String)> {
453    let env_url = format!("{}_URL", prefix);
454    let env_token = format!("{}_TOKEN", prefix);
455    let url = url
456        .or_else(|| std::env::var(&env_url).ok())
457        .ok_or_else(|| anyhow!("missing {env_url} (or url in backend config)"))?;
458    let token = token
459        .or_else(|| std::env::var(&env_token).ok())
460        .ok_or_else(|| anyhow!("missing {env_token} (or token in backend config)"))?;
461    Ok((url, token))
462}
463
464#[cfg(test)]
465mod tests {
466    use super::resolve_credentials;
467    use super::AdapterConfig;
468    use super::ExternalConfig;
469    use super::InfrahubSchemaConfig;
470    use super::InfrahubSchemaMode;
471    use alembic_core::{JsonMap, Key, Object, Schema, TypeName, Uid};
472    use alembic_engine::{BackendId, Op, StateData, StateStore};
473    use serde_json::json;
474    use std::collections::BTreeMap;
475    use std::fs;
476    use std::path::Path;
477    use std::sync::{Mutex, OnceLock};
478    use tempfile::tempdir;
479
480    fn env_lock() -> &'static Mutex<()> {
481        static LOCK: OnceLock<Mutex<()>> = OnceLock::new();
482        LOCK.get_or_init(|| Mutex::new(()))
483    }
484
485    #[test]
486    fn resolve_credentials_prefers_args() {
487        let _guard = env_lock().lock().unwrap();
488        let creds = resolve_credentials(
489            "NETBOX",
490            Some("http://example".to_string()),
491            Some("token".to_string()),
492        )
493        .unwrap();
494        assert_eq!(creds.0, "http://example");
495        assert_eq!(creds.1, "token");
496    }
497
498    #[test]
499    fn resolve_credentials_from_env() {
500        let _guard = env_lock().lock().unwrap();
501        let old_url = std::env::var("NETBOX_URL").ok();
502        let old_token = std::env::var("NETBOX_TOKEN").ok();
503        std::env::set_var("NETBOX_URL", "http://env");
504        std::env::set_var("NETBOX_TOKEN", "envtoken");
505
506        let result = std::panic::catch_unwind(|| {
507            let creds = resolve_credentials("NETBOX", None, None).unwrap();
508            assert_eq!(creds.0, "http://env");
509            assert_eq!(creds.1, "envtoken");
510        });
511
512        if let Some(value) = old_url {
513            std::env::set_var("NETBOX_URL", value);
514        } else {
515            std::env::remove_var("NETBOX_URL");
516        }
517        if let Some(value) = old_token {
518            std::env::set_var("NETBOX_TOKEN", value);
519        } else {
520            std::env::remove_var("NETBOX_TOKEN");
521        }
522
523        assert!(result.is_ok());
524    }
525
526    #[test]
527    fn resolve_credentials_missing_is_error() {
528        let _guard = env_lock().lock().unwrap();
529        let old_url = std::env::var("NETBOX_URL").ok();
530        let old_token = std::env::var("NETBOX_TOKEN").ok();
531        std::env::remove_var("NETBOX_URL");
532        std::env::remove_var("NETBOX_TOKEN");
533
534        let result = resolve_credentials("NETBOX", None, None);
535        assert!(result.is_err());
536
537        if let Some(value) = old_url {
538            std::env::set_var("NETBOX_URL", value);
539        }
540        if let Some(value) = old_token {
541            std::env::set_var("NETBOX_TOKEN", value);
542        }
543    }
544
545    #[test]
546    fn infrahub_schema_none_is_noop() {
547        let config = InfrahubSchemaConfig {
548            mode: InfrahubSchemaMode::None,
549            schema_path: None,
550            repository_id: None,
551            repository_name: None,
552            repository_root: None,
553            branch: None,
554            infrahubctl_path: None,
555        };
556        assert!(config.build().unwrap().is_none());
557    }
558
559    #[cfg(unix)]
560    fn make_executable(path: &Path) {
561        use std::os::unix::fs::PermissionsExt;
562        let mut perms = fs::metadata(path).unwrap().permissions();
563        perms.set_mode(0o755);
564        fs::set_permissions(path, perms).unwrap();
565    }
566
567    fn write_script(path: &Path, contents: &str) {
568        fs::write(path, contents).unwrap();
569        #[cfg(unix)]
570        make_executable(path);
571    }
572
573    #[tokio::test]
574    async fn external_adapter_roundtrip() {
575        let dir = tempdir().unwrap();
576        let script_path = dir.path().join("adapter.sh");
577        let script = r#"#!/usr/bin/env bash
578set -euo pipefail
579input="$(cat)"
580if [[ "$input" == *"\"method\":\"read\""* ]]; then
581  cat <<'JSON'
582{"ok":true,"result":[{"type_name":"dcim.site","key":{"name":"site-a"},"attrs":{"name":"Site A"},"backend_id":"site-1"}]}
583JSON
584elif [[ "$input" == *"\"method\":\"write\""* ]]; then
585  cat <<'JSON'
586{"ok":true,"result":{"applied":[{"uid":"00000000-0000-0000-0000-000000000001","type_name":"dcim.site","backend_id":"site-1"}]}}
587JSON
588elif [[ "$input" == *"\"method\":\"ensure_schema\""* ]]; then
589  cat <<'JSON'
590{"ok":true,"result":{"created_fields":["field1"],"created_tags":[],"created_object_types":["dcim.site"],"created_object_fields":["dcim.site.name"]}}
591JSON
592else
593  echo '{"ok":false,"error":"unknown method"}'
594fi
595"#;
596        write_script(&script_path, script);
597
598        let config = AdapterConfig::External(ExternalConfig {
599            command: Some(script_path.to_string_lossy().to_string()),
600            args: Vec::new(),
601            working_dir: None,
602            env: BTreeMap::new(),
603            timeout_seconds: Some(5),
604        });
605        let adapter = config.build().unwrap();
606        let schema = Schema {
607            types: BTreeMap::new(),
608        };
609        let state = StateStore::new(None, StateData::default());
610
611        let observed = adapter.read(&schema, &[], &state).await.unwrap();
612        assert_eq!(observed.by_key.len(), 1);
613
614        let uid = Uid::parse_str("00000000-0000-0000-0000-000000000001").unwrap();
615        let key = Key::from(BTreeMap::from([("name".to_string(), json!("site-a"))]));
616        let obj = Object {
617            uid,
618            type_name: TypeName::new("dcim.site"),
619            key,
620            attrs: JsonMap::from(BTreeMap::from([("name".to_string(), json!("Site A"))])),
621            source: None,
622        };
623        let ops = vec![Op::Create {
624            uid,
625            type_name: TypeName::new("dcim.site"),
626            desired: obj,
627        }];
628        let report = adapter.write(&schema, &ops, &state).await.unwrap();
629        assert_eq!(report.applied.len(), 1);
630        assert_eq!(
631            report.applied[0].backend_id,
632            Some(BackendId::String("site-1".to_string()))
633        );
634
635        let provision = adapter.ensure_schema(&schema).await.unwrap();
636        assert!(provision
637            .created_object_types
638            .contains(&"dcim.site".to_string()));
639    }
640}