alembic_adapter_peeringdb/
lib.rs1use alembic_core::{JsonMap, Key, Schema, TypeName};
7use alembic_engine::{Adapter, ApplyReport, BackendId, ObservedObject, ObservedState, Op};
8use anyhow::{anyhow, Result};
9use async_trait::async_trait;
10use serde::Serialize;
11use std::collections::{BTreeMap, BTreeSet};
12
13const SUPPORTED_TYPES: &[&str] = &[
15 "peeringdb.ix",
16 "peeringdb.net",
17 "peeringdb.org",
18 "peeringdb.netixlan",
19];
20
21pub struct PeeringDBAdapter;
22
23impl PeeringDBAdapter {
24 pub fn new() -> Self {
28 Self
29 }
30}
31
32impl Default for PeeringDBAdapter {
33 fn default() -> Self {
34 Self::new()
35 }
36}
37
38#[async_trait]
39impl Adapter for PeeringDBAdapter {
40 async fn read(
41 &self,
42 schema: &Schema,
43 types: &[TypeName],
44 _state: &alembic_engine::StateStore,
45 ) -> Result<ObservedState> {
46 let requested: BTreeSet<TypeName> = if types.is_empty() {
47 SUPPORTED_TYPES.iter().map(|s| TypeName::new(*s)).collect()
48 } else {
49 types.iter().cloned().collect()
50 };
51
52 let mut state = ObservedState::default();
53
54 for type_name in requested {
55 let type_schema = schema
56 .types
57 .get(type_name.as_str())
58 .ok_or_else(|| anyhow!("missing schema for {}", type_name))?
59 .clone();
60
61 let objects = match type_name.as_str() {
62 "peeringdb.ix" => {
63 let data = tokio::task::spawn_blocking(peeringdb_rs::load_peeringdb_ix)
64 .await?
65 .map_err(|e| anyhow!("failed to load ix data: {}", e))?;
66 to_observed_objects(&type_name, &type_schema, data)?
67 }
68 "peeringdb.net" => {
69 let data = tokio::task::spawn_blocking(peeringdb_rs::load_peeringdb_net)
70 .await?
71 .map_err(|e| anyhow!("failed to load net data: {}", e))?;
72 to_observed_objects(&type_name, &type_schema, data)?
73 }
74 "peeringdb.org" => {
75 let data = tokio::task::spawn_blocking(peeringdb_rs::load_peeringdb_org)
76 .await?
77 .map_err(|e| anyhow!("failed to load org data: {}", e))?;
78 to_observed_objects(&type_name, &type_schema, data)?
79 }
80 "peeringdb.netixlan" => {
81 let data = tokio::task::spawn_blocking(peeringdb_rs::load_peeringdb_netixlan)
82 .await?
83 .map_err(|e| anyhow!("failed to load netixlan data: {}", e))?;
84 to_observed_objects(&type_name, &type_schema, data)?
85 }
86 _ => continue, };
88
89 for object in objects {
90 state.insert(object);
91 }
92 }
93
94 Ok(state)
95 }
96
97 async fn write(
98 &self,
99 _schema: &Schema,
100 _ops: &[Op],
101 _state: &alembic_engine::StateStore,
102 ) -> Result<ApplyReport> {
103 Err(anyhow!("PeeringDB adapter is read-only"))
104 }
105}
106
107trait HasId {
109 fn id(&self) -> u32;
110}
111
112impl HasId for peeringdb_rs::PeeringdbIx {
113 fn id(&self) -> u32 {
114 self.id
115 }
116}
117
118impl HasId for peeringdb_rs::PeeringdbNet {
119 fn id(&self) -> u32 {
120 self.id
121 }
122}
123
124impl HasId for peeringdb_rs::PeeringdbOrg {
125 fn id(&self) -> u32 {
126 self.id
127 }
128}
129
130impl HasId for peeringdb_rs::PeeringdbNetixlan {
131 fn id(&self) -> u32 {
132 self.id
133 }
134}
135
136fn to_observed_objects<T: Serialize + HasId>(
137 type_name: &TypeName,
138 type_schema: &alembic_core::TypeSchema,
139 items: Vec<T>,
140) -> Result<Vec<ObservedObject>> {
141 let mut objects = Vec::new();
142
143 for item in items {
144 let id = item.id();
145 let value = serde_json::to_value(&item)?;
146 let attrs: JsonMap = match value {
147 serde_json::Value::Object(map) => map.into_iter().collect::<BTreeMap<_, _>>().into(),
148 _ => return Err(anyhow!("expected object from serialization")),
149 };
150
151 let key = build_key_from_schema(type_schema, &attrs)?;
152
153 objects.push(ObservedObject {
154 type_name: type_name.clone(),
155 key,
156 attrs,
157 backend_id: Some(BackendId::Int(id as u64)),
158 });
159 }
160
161 Ok(objects)
162}
163
164fn build_key_from_schema(type_schema: &alembic_core::TypeSchema, attrs: &JsonMap) -> Result<Key> {
165 let mut map = BTreeMap::new();
166 for field in type_schema.key.keys() {
167 let Some(value) = attrs.get(field) else {
168 return Err(anyhow!("missing key field {field}"));
169 };
170 map.insert(field.clone(), value.clone());
171 }
172 Ok(Key::from(map))
173}
174
175#[cfg(test)]
176mod tests {
177 use super::*;
178 use alembic_core::{FieldSchema, FieldType, TypeSchema};
179
180 fn ix_schema() -> TypeSchema {
181 TypeSchema {
182 key: BTreeMap::from([(
183 "name".to_string(),
184 FieldSchema {
185 r#type: FieldType::String,
186 required: true,
187 nullable: false,
188 description: None,
189 format: None,
190 pattern: None,
191 },
192 )]),
193 fields: BTreeMap::new(),
194 }
195 }
196
197 fn test_schema() -> Schema {
198 Schema {
199 types: BTreeMap::from([("peeringdb.ix".to_string(), ix_schema())]),
200 }
201 }
202
203 #[test]
204 fn new_creates_adapter() {
205 let _adapter = PeeringDBAdapter::new();
206 }
207
208 #[test]
209 fn build_key_extracts_fields() {
210 let schema = ix_schema();
211 let attrs: JsonMap = BTreeMap::from([
212 ("id".to_string(), serde_json::json!(1)),
213 ("name".to_string(), serde_json::json!("DE-CIX Frankfurt")),
214 ])
215 .into();
216 let key = build_key_from_schema(&schema, &attrs).unwrap();
217 assert_eq!(
218 key.get("name"),
219 Some(&serde_json::json!("DE-CIX Frankfurt"))
220 );
221 }
222
223 #[test]
224 fn build_key_errors_on_missing_field() {
225 let schema = ix_schema();
226 let attrs: JsonMap = BTreeMap::from([("id".to_string(), serde_json::json!(1))]).into();
227 let err = build_key_from_schema(&schema, &attrs).unwrap_err();
228 assert!(err.to_string().contains("missing key field name"));
229 }
230
231 #[tokio::test]
232 async fn apply_returns_read_only_error() {
233 let adapter = PeeringDBAdapter::new();
234 let schema = test_schema();
235 let state = alembic_engine::StateStore::new(None, alembic_engine::StateData::default());
236 let err = adapter.write(&schema, &[], &state).await.unwrap_err();
237 assert!(err.to_string().contains("read-only"));
238 }
239
240 #[tokio::test]
241 async fn observe_errors_on_missing_schema() {
242 let adapter = PeeringDBAdapter::new();
243 let schema = Schema {
244 types: BTreeMap::new(),
245 };
246 let state = alembic_engine::StateStore::new(None, alembic_engine::StateData::default());
247 let err = adapter
248 .read(&schema, &[TypeName::new("peeringdb.ix")], &state)
249 .await
250 .unwrap_err();
251 assert!(err.to_string().contains("missing schema"));
252 }
253
254 #[tokio::test]
255 async fn observe_skips_unsupported_types() {
256 let adapter = PeeringDBAdapter::new();
257 let schema = Schema {
258 types: BTreeMap::from([("peeringdb.unsupported".to_string(), ix_schema())]),
259 };
260 let state_store =
261 alembic_engine::StateStore::new(None, alembic_engine::StateData::default());
262 let state = adapter
263 .read(
264 &schema,
265 &[TypeName::new("peeringdb.unsupported")],
266 &state_store,
267 )
268 .await
269 .unwrap();
270
271 assert_eq!(state.by_key.len(), 0);
272 }
273}