1use alembic_engine::{
4 Adapter, ApplyReport, BackendId, ObservedObject, ObservedState, Op, ProvisionReport, StateData,
5 StateStore,
6};
7use anyhow::{anyhow, Context, Result};
8use serde::{de::DeserializeOwned, Deserialize, Serialize};
9use serde_json::Value as JsonValue;
10use std::collections::BTreeMap;
11use std::fs;
12use std::path::PathBuf;
13use std::process::Stdio;
14use std::time::Duration;
15use tokio::io::AsyncWriteExt;
16use tokio::process::Command;
17use tokio::time::timeout;
18
19const SUPPORTED_BACKENDS: &[&str] = &[
20 "netbox",
21 "nautobot",
22 "infrahub",
23 "generic",
24 "peeringdb",
25 "external",
26];
27
28#[derive(Debug, Deserialize)]
29#[serde(tag = "backend", rename_all = "kebab-case")]
30pub enum AdapterConfig {
31 Netbox(NetboxConfig),
32 Nautobot(NautobotConfig),
33 Infrahub(InfrahubConfig),
34 Generic(GenericConfig),
35 Peeringdb,
36 External(ExternalConfig),
37}
38
39#[derive(Debug, Deserialize)]
40pub struct NetboxConfig {
41 pub url: Option<String>,
42 pub token: Option<String>,
43}
44
45#[derive(Debug, Deserialize)]
46pub struct NautobotConfig {
47 pub url: Option<String>,
48 pub token: Option<String>,
49}
50
51#[derive(Debug, Deserialize)]
52pub struct InfrahubConfig {
53 pub url: Option<String>,
54 pub token: Option<String>,
55 pub branch: Option<String>,
56 #[serde(default)]
57 pub schema: Option<InfrahubSchemaConfig>,
58}
59
60#[derive(Debug, Deserialize)]
61pub struct GenericConfig {
62 pub config: Option<alembic_adapter_generic::GenericConfig>,
63 pub config_path: Option<PathBuf>,
64}
65
66#[derive(Debug, Deserialize)]
67pub struct ExternalConfig {
68 pub command: Option<String>,
69 #[serde(default)]
70 pub args: Vec<String>,
71 pub working_dir: Option<PathBuf>,
72 #[serde(default)]
73 pub env: BTreeMap<String, String>,
74 pub timeout_seconds: Option<u64>,
75}
76
77#[derive(Debug, Clone, Copy, PartialEq, Eq, Deserialize, Default)]
78#[serde(rename_all = "kebab-case")]
79pub enum InfrahubSchemaMode {
80 #[default]
81 None,
82 Infrahubctl,
83 Repository,
84}
85
86#[derive(Debug, Deserialize)]
87pub struct InfrahubSchemaConfig {
88 #[serde(default)]
89 pub mode: InfrahubSchemaMode,
90 pub schema_path: Option<PathBuf>,
91 pub repository_id: Option<String>,
92 pub repository_name: Option<String>,
93 pub repository_root: Option<PathBuf>,
94 pub branch: Option<String>,
95 pub infrahubctl_path: Option<PathBuf>,
96}
97
98impl AdapterConfig {
99 fn backend_name(&self) -> &'static str {
100 match self {
101 AdapterConfig::Netbox(_) => "netbox",
102 AdapterConfig::Nautobot(_) => "nautobot",
103 AdapterConfig::Infrahub(_) => "infrahub",
104 AdapterConfig::Generic(_) => "generic",
105 AdapterConfig::Peeringdb => "peeringdb",
106 AdapterConfig::External(_) => "external",
107 }
108 }
109
110 fn from_env(backend: &str) -> Result<Self> {
111 match backend.to_lowercase().as_str() {
112 "netbox" => Ok(AdapterConfig::Netbox(NetboxConfig {
113 url: None,
114 token: None,
115 })),
116 "nautobot" => Ok(AdapterConfig::Nautobot(NautobotConfig {
117 url: None,
118 token: None,
119 })),
120 "infrahub" => Ok(AdapterConfig::Infrahub(InfrahubConfig {
121 url: None,
122 token: None,
123 branch: None,
124 schema: None,
125 })),
126 "generic" => Ok(AdapterConfig::Generic(GenericConfig {
127 config: None,
128 config_path: None,
129 })),
130 "peeringdb" => Ok(AdapterConfig::Peeringdb),
131 "external" => Ok(AdapterConfig::External(ExternalConfig {
132 command: None,
133 args: Vec::new(),
134 working_dir: None,
135 env: BTreeMap::new(),
136 timeout_seconds: None,
137 })),
138 other => Err(anyhow!(
139 "unsupported backend {other} (expected one of: {})",
140 SUPPORTED_BACKENDS.join(", ")
141 )),
142 }
143 }
144
145 fn build(self) -> Result<Box<dyn Adapter>> {
146 match self {
147 AdapterConfig::Netbox(cfg) => {
148 let (url, token) = resolve_credentials("NETBOX", cfg.url, cfg.token)?;
149 Ok(Box::new(alembic_adapter_netbox::NetBoxAdapter::new(
150 &url, &token,
151 )?))
152 }
153 AdapterConfig::Nautobot(cfg) => {
154 let (url, token) = resolve_credentials("NAUTOBOT", cfg.url, cfg.token)?;
155 Ok(Box::new(alembic_adapter_nautobot::NautobotAdapter::new(
156 &url, &token,
157 )?))
158 }
159 AdapterConfig::Infrahub(cfg) => {
160 let (url, token) = resolve_credentials("INFRAHUB", cfg.url, cfg.token)?;
161 let mut adapter = alembic_adapter_infrahub::InfrahubAdapter::new(
162 &url,
163 &token,
164 cfg.branch.as_deref(),
165 )?;
166 if let Some(schema_cfg) = cfg.schema {
167 if let Some(schema_push) = schema_cfg.build()? {
168 adapter = adapter.with_schema_push(schema_push);
169 }
170 }
171 Ok(Box::new(adapter))
172 }
173 AdapterConfig::Generic(cfg) => {
174 if cfg.config.is_some() && cfg.config_path.is_some() {
175 return Err(anyhow!(
176 "generic adapter config cannot include both config and config_path"
177 ));
178 }
179 let config = if let Some(config) = cfg.config {
180 config
181 } else {
182 let path = cfg
183 .config_path
184 .or_else(|| std::env::var("GENERIC_CONFIG").ok().map(PathBuf::from));
185 let path =
186 path.ok_or_else(|| anyhow!("generic backend requires config_path"))?;
187 let content = fs::read_to_string(&path)
188 .with_context(|| format!("read generic config: {}", path.display()))?;
189 serde_yaml::from_str(&content)
190 .with_context(|| format!("parse generic config: {}", path.display()))?
191 };
192 Ok(Box::new(alembic_adapter_generic::GenericAdapter::new(
193 config,
194 )?))
195 }
196 AdapterConfig::Peeringdb => {
197 Ok(Box::new(alembic_adapter_peeringdb::PeeringDBAdapter::new()))
198 }
199 AdapterConfig::External(cfg) => Ok(Box::new(ProcessAdapter::new(cfg)?)),
200 }
201 }
202}
203
204#[derive(Debug, Clone)]
205struct ProcessAdapter {
206 command: String,
207 args: Vec<String>,
208 working_dir: Option<PathBuf>,
209 env: BTreeMap<String, String>,
210 timeout: Duration,
211}
212
213impl ProcessAdapter {
214 fn new(cfg: ExternalConfig) -> Result<Self> {
215 let command = cfg
216 .command
217 .or_else(|| std::env::var("EXTERNAL_COMMAND").ok())
218 .ok_or_else(|| anyhow!("external backend requires command"))?;
219 let timeout = Duration::from_secs(cfg.timeout_seconds.unwrap_or(120));
220 Ok(Self {
221 command,
222 args: cfg.args,
223 working_dir: cfg.working_dir,
224 env: cfg.env,
225 timeout,
226 })
227 }
228
229 async fn call<R: DeserializeOwned>(&self, request: ExternalRequest<'_>) -> Result<R> {
230 let envelope = ExternalEnvelope {
231 version: 1,
232 request,
233 };
234 let payload = serde_json::to_vec(&envelope).context("serialize external request")?;
235 let output = self.run(payload).await?;
236 let stdout =
237 String::from_utf8(output.stdout).context("external adapter response not utf-8")?;
238 let response: ExternalResponse<JsonValue> =
239 serde_json::from_str(&stdout).context("parse external adapter response")?;
240 if !response.ok {
241 let message = response
242 .error
243 .unwrap_or_else(|| "external adapter error".to_string());
244 return Err(anyhow!(message));
245 }
246 let result = response
247 .result
248 .ok_or_else(|| anyhow!("external adapter response missing result"))?;
249 serde_json::from_value(result).context("deserialize external adapter result")
250 }
251
252 async fn run(&self, payload: Vec<u8>) -> Result<std::process::Output> {
253 let mut cmd = Command::new(&self.command);
254 cmd.args(&self.args)
255 .stdin(Stdio::piped())
256 .stdout(Stdio::piped())
257 .stderr(Stdio::piped());
258 if let Some(dir) = &self.working_dir {
259 cmd.current_dir(dir);
260 }
261 for (key, value) in &self.env {
262 cmd.env(key, value);
263 }
264
265 let mut child = cmd.spawn().context("spawn external adapter")?;
266 if let Some(mut stdin) = child.stdin.take() {
267 stdin
268 .write_all(&payload)
269 .await
270 .context("write external adapter stdin")?;
271 }
272
273 let output = timeout(self.timeout, child.wait_with_output())
274 .await
275 .context("external adapter timed out")?
276 .context("wait for external adapter")?;
277
278 if !output.status.success() {
279 let stderr = String::from_utf8_lossy(&output.stderr).trim().to_string();
280 return Err(anyhow!(
281 "external adapter exited with {}: {}",
282 output.status,
283 stderr
284 ));
285 }
286
287 Ok(output)
288 }
289}
290
291#[async_trait::async_trait]
292impl Adapter for ProcessAdapter {
293 async fn read(
294 &self,
295 schema: &alembic_core::Schema,
296 types: &[alembic_core::TypeName],
297 state: &StateStore,
298 ) -> Result<ObservedState> {
299 let state = StateData {
300 mappings: state.all_mappings().clone(),
301 };
302 let objects: Vec<ObservedObjectData> = self
303 .call(ExternalRequest::Read {
304 schema,
305 types,
306 state,
307 })
308 .await?;
309 let mut observed = ObservedState::default();
310 for object in objects {
311 observed.insert(ObservedObject {
312 type_name: object.type_name,
313 key: object.key,
314 attrs: object.attrs,
315 backend_id: object.backend_id,
316 });
317 }
318 Ok(observed)
319 }
320
321 async fn write(
322 &self,
323 schema: &alembic_core::Schema,
324 ops: &[Op],
325 state: &StateStore,
326 ) -> Result<ApplyReport> {
327 let state = StateData {
328 mappings: state.all_mappings().clone(),
329 };
330 self.call(ExternalRequest::Write { schema, ops, state })
331 .await
332 }
333
334 async fn ensure_schema(&self, schema: &alembic_core::Schema) -> Result<ProvisionReport> {
335 self.call(ExternalRequest::EnsureSchema { schema }).await
336 }
337}
338
339#[derive(Debug, Serialize)]
340struct ExternalEnvelope<'a> {
341 version: u8,
342 #[serde(flatten)]
343 request: ExternalRequest<'a>,
344}
345
346#[derive(Debug, Serialize)]
347#[serde(tag = "method", rename_all = "snake_case")]
348enum ExternalRequest<'a> {
349 Read {
350 schema: &'a alembic_core::Schema,
351 types: &'a [alembic_core::TypeName],
352 state: StateData,
353 },
354 Write {
355 schema: &'a alembic_core::Schema,
356 ops: &'a [Op],
357 state: StateData,
358 },
359 EnsureSchema {
360 schema: &'a alembic_core::Schema,
361 },
362}
363
364#[derive(Debug, Deserialize)]
365struct ExternalResponse<T> {
366 ok: bool,
367 result: Option<T>,
368 error: Option<String>,
369}
370
371#[derive(Debug, Deserialize)]
372struct ObservedObjectData {
373 type_name: alembic_core::TypeName,
374 key: alembic_core::Key,
375 attrs: alembic_core::JsonMap,
376 backend_id: Option<BackendId>,
377}
378
379impl InfrahubSchemaConfig {
380 fn build(self) -> Result<Option<alembic_adapter_infrahub::SchemaPushConfig>> {
381 if self.mode == InfrahubSchemaMode::None {
382 return Ok(None);
383 }
384 let schema_path = self
385 .schema_path
386 .ok_or_else(|| anyhow!("infrahub schema requires schema_path"))?;
387 let mode = match self.mode {
388 InfrahubSchemaMode::Infrahubctl => {
389 alembic_adapter_infrahub::SchemaApplyMode::Infrahubctl
390 }
391 InfrahubSchemaMode::Repository => alembic_adapter_infrahub::SchemaApplyMode::Repository,
392 InfrahubSchemaMode::None => alembic_adapter_infrahub::SchemaApplyMode::Infrahubctl,
393 };
394 let config = alembic_adapter_infrahub::SchemaPushConfig {
395 schema_path,
396 mode,
397 repository_id: self.repository_id,
398 repository_name: self.repository_name,
399 repository_root: self.repository_root,
400 branch: self.branch,
401 infrahubctl_path: self.infrahubctl_path,
402 };
403
404 if config.mode == alembic_adapter_infrahub::SchemaApplyMode::Repository {
405 if config.repository_root.is_none() {
406 return Err(anyhow!(
407 "infrahub schema repository mode requires repository_root"
408 ));
409 }
410 if config.repository_id.is_none() && config.repository_name.is_none() {
411 return Err(anyhow!(
412 "infrahub schema repository mode requires repository_id or repository_name"
413 ));
414 }
415 }
416
417 Ok(Some(config))
418 }
419}
420
421pub fn create_adapter(
422 backend: Option<&str>,
423 config_path: Option<PathBuf>,
424) -> Result<Box<dyn Adapter>> {
425 let config = if let Some(path) = config_path {
426 let content = fs::read_to_string(&path)
427 .with_context(|| format!("read adapter config: {}", path.display()))?;
428 let config: AdapterConfig = serde_yaml::from_str(&content)
429 .with_context(|| format!("parse adapter config: {}", path.display()))?;
430 if let Some(backend) = backend {
431 if backend.to_lowercase() != config.backend_name() {
432 return Err(anyhow!(
433 "backend {backend} does not match config backend {}",
434 config.backend_name()
435 ));
436 }
437 }
438 config
439 } else {
440 let backend =
441 backend.ok_or_else(|| anyhow!("--backend or --backend-config is required"))?;
442 AdapterConfig::from_env(backend)?
443 };
444
445 config.build()
446}
447
448pub fn resolve_credentials(
449 prefix: &str,
450 url: Option<String>,
451 token: Option<String>,
452) -> Result<(String, String)> {
453 let env_url = format!("{}_URL", prefix);
454 let env_token = format!("{}_TOKEN", prefix);
455 let url = url
456 .or_else(|| std::env::var(&env_url).ok())
457 .ok_or_else(|| anyhow!("missing {env_url} (or url in backend config)"))?;
458 let token = token
459 .or_else(|| std::env::var(&env_token).ok())
460 .ok_or_else(|| anyhow!("missing {env_token} (or token in backend config)"))?;
461 Ok((url, token))
462}
463
464#[cfg(test)]
465mod tests {
466 use super::resolve_credentials;
467 use super::AdapterConfig;
468 use super::ExternalConfig;
469 use super::InfrahubSchemaConfig;
470 use super::InfrahubSchemaMode;
471 use alembic_core::{JsonMap, Key, Object, Schema, TypeName, Uid};
472 use alembic_engine::{BackendId, Op, StateData, StateStore};
473 use serde_json::json;
474 use std::collections::BTreeMap;
475 use std::fs;
476 use std::path::Path;
477 use std::sync::{Mutex, OnceLock};
478 use tempfile::tempdir;
479
480 fn env_lock() -> &'static Mutex<()> {
481 static LOCK: OnceLock<Mutex<()>> = OnceLock::new();
482 LOCK.get_or_init(|| Mutex::new(()))
483 }
484
485 #[test]
486 fn resolve_credentials_prefers_args() {
487 let _guard = env_lock().lock().unwrap();
488 let creds = resolve_credentials(
489 "NETBOX",
490 Some("http://example".to_string()),
491 Some("token".to_string()),
492 )
493 .unwrap();
494 assert_eq!(creds.0, "http://example");
495 assert_eq!(creds.1, "token");
496 }
497
498 #[test]
499 fn resolve_credentials_from_env() {
500 let _guard = env_lock().lock().unwrap();
501 let old_url = std::env::var("NETBOX_URL").ok();
502 let old_token = std::env::var("NETBOX_TOKEN").ok();
503 std::env::set_var("NETBOX_URL", "http://env");
504 std::env::set_var("NETBOX_TOKEN", "envtoken");
505
506 let result = std::panic::catch_unwind(|| {
507 let creds = resolve_credentials("NETBOX", None, None).unwrap();
508 assert_eq!(creds.0, "http://env");
509 assert_eq!(creds.1, "envtoken");
510 });
511
512 if let Some(value) = old_url {
513 std::env::set_var("NETBOX_URL", value);
514 } else {
515 std::env::remove_var("NETBOX_URL");
516 }
517 if let Some(value) = old_token {
518 std::env::set_var("NETBOX_TOKEN", value);
519 } else {
520 std::env::remove_var("NETBOX_TOKEN");
521 }
522
523 assert!(result.is_ok());
524 }
525
526 #[test]
527 fn resolve_credentials_missing_is_error() {
528 let _guard = env_lock().lock().unwrap();
529 let old_url = std::env::var("NETBOX_URL").ok();
530 let old_token = std::env::var("NETBOX_TOKEN").ok();
531 std::env::remove_var("NETBOX_URL");
532 std::env::remove_var("NETBOX_TOKEN");
533
534 let result = resolve_credentials("NETBOX", None, None);
535 assert!(result.is_err());
536
537 if let Some(value) = old_url {
538 std::env::set_var("NETBOX_URL", value);
539 }
540 if let Some(value) = old_token {
541 std::env::set_var("NETBOX_TOKEN", value);
542 }
543 }
544
545 #[test]
546 fn infrahub_schema_none_is_noop() {
547 let config = InfrahubSchemaConfig {
548 mode: InfrahubSchemaMode::None,
549 schema_path: None,
550 repository_id: None,
551 repository_name: None,
552 repository_root: None,
553 branch: None,
554 infrahubctl_path: None,
555 };
556 assert!(config.build().unwrap().is_none());
557 }
558
559 #[cfg(unix)]
560 fn make_executable(path: &Path) {
561 use std::os::unix::fs::PermissionsExt;
562 let mut perms = fs::metadata(path).unwrap().permissions();
563 perms.set_mode(0o755);
564 fs::set_permissions(path, perms).unwrap();
565 }
566
567 fn write_script(path: &Path, contents: &str) {
568 fs::write(path, contents).unwrap();
569 #[cfg(unix)]
570 make_executable(path);
571 }
572
573 #[tokio::test]
574 async fn external_adapter_roundtrip() {
575 let dir = tempdir().unwrap();
576 let script_path = dir.path().join("adapter.sh");
577 let script = r#"#!/usr/bin/env bash
578set -euo pipefail
579input="$(cat)"
580if [[ "$input" == *"\"method\":\"read\""* ]]; then
581 cat <<'JSON'
582{"ok":true,"result":[{"type_name":"dcim.site","key":{"name":"site-a"},"attrs":{"name":"Site A"},"backend_id":"site-1"}]}
583JSON
584elif [[ "$input" == *"\"method\":\"write\""* ]]; then
585 cat <<'JSON'
586{"ok":true,"result":{"applied":[{"uid":"00000000-0000-0000-0000-000000000001","type_name":"dcim.site","backend_id":"site-1"}]}}
587JSON
588elif [[ "$input" == *"\"method\":\"ensure_schema\""* ]]; then
589 cat <<'JSON'
590{"ok":true,"result":{"created_fields":["field1"],"created_tags":[],"created_object_types":["dcim.site"],"created_object_fields":["dcim.site.name"]}}
591JSON
592else
593 echo '{"ok":false,"error":"unknown method"}'
594fi
595"#;
596 write_script(&script_path, script);
597
598 let config = AdapterConfig::External(ExternalConfig {
599 command: Some(script_path.to_string_lossy().to_string()),
600 args: Vec::new(),
601 working_dir: None,
602 env: BTreeMap::new(),
603 timeout_seconds: Some(5),
604 });
605 let adapter = config.build().unwrap();
606 let schema = Schema {
607 types: BTreeMap::new(),
608 };
609 let state = StateStore::new(None, StateData::default());
610
611 let observed = adapter.read(&schema, &[], &state).await.unwrap();
612 assert_eq!(observed.by_key.len(), 1);
613
614 let uid = Uid::parse_str("00000000-0000-0000-0000-000000000001").unwrap();
615 let key = Key::from(BTreeMap::from([("name".to_string(), json!("site-a"))]));
616 let obj = Object {
617 uid,
618 type_name: TypeName::new("dcim.site"),
619 key,
620 attrs: JsonMap::from(BTreeMap::from([("name".to_string(), json!("Site A"))])),
621 source: None,
622 };
623 let ops = vec![Op::Create {
624 uid,
625 type_name: TypeName::new("dcim.site"),
626 desired: obj,
627 }];
628 let report = adapter.write(&schema, &ops, &state).await.unwrap();
629 assert_eq!(report.applied.len(), 1);
630 assert_eq!(
631 report.applied[0].backend_id,
632 Some(BackendId::String("site-1".to_string()))
633 );
634
635 let provision = adapter.ensure_schema(&schema).await.unwrap();
636 assert!(provision
637 .created_object_types
638 .contains(&"dcim.site".to_string()));
639 }
640}