Skip to main content

alembic_adapter_peeringdb/
lib.rs

1//! peeringdb adapter for alembic.
2//!
3//! Uses the peeringdb-rs crate to fetch data from PeeringDB.
4//! Set the `PEERINGDB_API_KEY` environment variable to authenticate.
5
6use alembic_core::{JsonMap, Key, Schema, TypeName};
7use alembic_engine::{Adapter, ApplyReport, BackendId, ObservedObject, ObservedState, Op};
8use anyhow::{anyhow, Result};
9use async_trait::async_trait;
10use serde::Serialize;
11use std::collections::{BTreeMap, BTreeSet};
12
13/// Supported PeeringDB types.
14const SUPPORTED_TYPES: &[&str] = &[
15    "peeringdb.ix",
16    "peeringdb.net",
17    "peeringdb.org",
18    "peeringdb.netixlan",
19];
20
21pub struct PeeringDBAdapter;
22
23impl PeeringDBAdapter {
24    /// Create a new PeeringDB adapter.
25    ///
26    /// Authentication is handled via the `PEERINGDB_API_KEY` environment variable.
27    pub fn new() -> Self {
28        Self
29    }
30}
31
32impl Default for PeeringDBAdapter {
33    fn default() -> Self {
34        Self::new()
35    }
36}
37
38#[async_trait]
39impl Adapter for PeeringDBAdapter {
40    async fn read(
41        &self,
42        schema: &Schema,
43        types: &[TypeName],
44        _state: &alembic_engine::StateStore,
45    ) -> Result<ObservedState> {
46        let requested: BTreeSet<TypeName> = if types.is_empty() {
47            SUPPORTED_TYPES.iter().map(|s| TypeName::new(*s)).collect()
48        } else {
49            types.iter().cloned().collect()
50        };
51
52        let mut state = ObservedState::default();
53
54        for type_name in requested {
55            let type_schema = schema
56                .types
57                .get(type_name.as_str())
58                .ok_or_else(|| anyhow!("missing schema for {}", type_name))?
59                .clone();
60
61            let objects = match type_name.as_str() {
62                "peeringdb.ix" => {
63                    let data = tokio::task::spawn_blocking(peeringdb_rs::load_peeringdb_ix)
64                        .await?
65                        .map_err(|e| anyhow!("failed to load ix data: {}", e))?;
66                    to_observed_objects(&type_name, &type_schema, data)?
67                }
68                "peeringdb.net" => {
69                    let data = tokio::task::spawn_blocking(peeringdb_rs::load_peeringdb_net)
70                        .await?
71                        .map_err(|e| anyhow!("failed to load net data: {}", e))?;
72                    to_observed_objects(&type_name, &type_schema, data)?
73                }
74                "peeringdb.org" => {
75                    let data = tokio::task::spawn_blocking(peeringdb_rs::load_peeringdb_org)
76                        .await?
77                        .map_err(|e| anyhow!("failed to load org data: {}", e))?;
78                    to_observed_objects(&type_name, &type_schema, data)?
79                }
80                "peeringdb.netixlan" => {
81                    let data = tokio::task::spawn_blocking(peeringdb_rs::load_peeringdb_netixlan)
82                        .await?
83                        .map_err(|e| anyhow!("failed to load netixlan data: {}", e))?;
84                    to_observed_objects(&type_name, &type_schema, data)?
85                }
86                _ => continue, // Skip unsupported types
87            };
88
89            for object in objects {
90                state.insert(object);
91            }
92        }
93
94        Ok(state)
95    }
96
97    async fn write(
98        &self,
99        _schema: &Schema,
100        _ops: &[Op],
101        _state: &alembic_engine::StateStore,
102    ) -> Result<ApplyReport> {
103        Err(anyhow!("PeeringDB adapter is read-only"))
104    }
105}
106
107/// Trait for PeeringDB objects that have an id field.
108trait HasId {
109    fn id(&self) -> u32;
110}
111
112impl HasId for peeringdb_rs::PeeringdbIx {
113    fn id(&self) -> u32 {
114        self.id
115    }
116}
117
118impl HasId for peeringdb_rs::PeeringdbNet {
119    fn id(&self) -> u32 {
120        self.id
121    }
122}
123
124impl HasId for peeringdb_rs::PeeringdbOrg {
125    fn id(&self) -> u32 {
126        self.id
127    }
128}
129
130impl HasId for peeringdb_rs::PeeringdbNetixlan {
131    fn id(&self) -> u32 {
132        self.id
133    }
134}
135
136fn to_observed_objects<T: Serialize + HasId>(
137    type_name: &TypeName,
138    type_schema: &alembic_core::TypeSchema,
139    items: Vec<T>,
140) -> Result<Vec<ObservedObject>> {
141    let mut objects = Vec::new();
142
143    for item in items {
144        let id = item.id();
145        let value = serde_json::to_value(&item)?;
146        let attrs: JsonMap = match value {
147            serde_json::Value::Object(map) => map.into_iter().collect::<BTreeMap<_, _>>().into(),
148            _ => return Err(anyhow!("expected object from serialization")),
149        };
150
151        let key = build_key_from_schema(type_schema, &attrs)?;
152
153        objects.push(ObservedObject {
154            type_name: type_name.clone(),
155            key,
156            attrs,
157            backend_id: Some(BackendId::Int(id as u64)),
158        });
159    }
160
161    Ok(objects)
162}
163
164fn build_key_from_schema(type_schema: &alembic_core::TypeSchema, attrs: &JsonMap) -> Result<Key> {
165    let mut map = BTreeMap::new();
166    for field in type_schema.key.keys() {
167        let Some(value) = attrs.get(field) else {
168            return Err(anyhow!("missing key field {field}"));
169        };
170        map.insert(field.clone(), value.clone());
171    }
172    Ok(Key::from(map))
173}
174
175#[cfg(test)]
176mod tests {
177    use super::*;
178    use alembic_core::{FieldSchema, FieldType, TypeSchema};
179
180    fn ix_schema() -> TypeSchema {
181        TypeSchema {
182            key: BTreeMap::from([(
183                "name".to_string(),
184                FieldSchema {
185                    r#type: FieldType::String,
186                    required: true,
187                    nullable: false,
188                    description: None,
189                    format: None,
190                    pattern: None,
191                },
192            )]),
193            fields: BTreeMap::new(),
194        }
195    }
196
197    fn test_schema() -> Schema {
198        Schema {
199            types: BTreeMap::from([("peeringdb.ix".to_string(), ix_schema())]),
200        }
201    }
202
203    #[test]
204    fn new_creates_adapter() {
205        let _adapter = PeeringDBAdapter::new();
206    }
207
208    #[test]
209    fn build_key_extracts_fields() {
210        let schema = ix_schema();
211        let attrs: JsonMap = BTreeMap::from([
212            ("id".to_string(), serde_json::json!(1)),
213            ("name".to_string(), serde_json::json!("DE-CIX Frankfurt")),
214        ])
215        .into();
216        let key = build_key_from_schema(&schema, &attrs).unwrap();
217        assert_eq!(
218            key.get("name"),
219            Some(&serde_json::json!("DE-CIX Frankfurt"))
220        );
221    }
222
223    #[test]
224    fn build_key_errors_on_missing_field() {
225        let schema = ix_schema();
226        let attrs: JsonMap = BTreeMap::from([("id".to_string(), serde_json::json!(1))]).into();
227        let err = build_key_from_schema(&schema, &attrs).unwrap_err();
228        assert!(err.to_string().contains("missing key field name"));
229    }
230
231    #[tokio::test]
232    async fn apply_returns_read_only_error() {
233        let adapter = PeeringDBAdapter::new();
234        let schema = test_schema();
235        let state = alembic_engine::StateStore::new(None, alembic_engine::StateData::default());
236        let err = adapter.write(&schema, &[], &state).await.unwrap_err();
237        assert!(err.to_string().contains("read-only"));
238    }
239
240    #[tokio::test]
241    async fn observe_errors_on_missing_schema() {
242        let adapter = PeeringDBAdapter::new();
243        let schema = Schema {
244            types: BTreeMap::new(),
245        };
246        let state = alembic_engine::StateStore::new(None, alembic_engine::StateData::default());
247        let err = adapter
248            .read(&schema, &[TypeName::new("peeringdb.ix")], &state)
249            .await
250            .unwrap_err();
251        assert!(err.to_string().contains("missing schema"));
252    }
253
254    #[tokio::test]
255    async fn observe_skips_unsupported_types() {
256        let adapter = PeeringDBAdapter::new();
257        let schema = Schema {
258            types: BTreeMap::from([("peeringdb.unsupported".to_string(), ix_schema())]),
259        };
260        let state_store =
261            alembic_engine::StateStore::new(None, alembic_engine::StateData::default());
262        let state = adapter
263            .read(
264                &schema,
265                &[TypeName::new("peeringdb.unsupported")],
266                &state_store,
267            )
268            .await
269            .unwrap();
270
271        assert_eq!(state.by_key.len(), 0);
272    }
273}