1use alembic_core::{key_string, JsonMap, Key, Object, Schema, TypeName, Uid};
4use anyhow::{anyhow, Result};
5use async_trait::async_trait;
6use serde::{Deserialize, Serialize};
7use std::collections::BTreeMap;
8use std::fmt;
9use std::hash::{DefaultHasher, Hash, Hasher};
10
11#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
13#[serde(untagged)]
14pub enum BackendId {
15 Int(u64),
16 String(String),
17}
18
19impl fmt::Display for BackendId {
20 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
21 match self {
22 BackendId::Int(id) => write!(f, "{}", id),
23 BackendId::String(id) => write!(f, "{}", id),
24 }
25 }
26}
27
28impl From<u64> for BackendId {
29 fn from(id: u64) -> Self {
30 BackendId::Int(id)
31 }
32}
33
34impl From<String> for BackendId {
35 fn from(id: String) -> Self {
36 BackendId::String(id)
37 }
38}
39
40#[derive(Debug, Clone, PartialEq, Serialize, Deserialize, Hash, Eq)]
42pub struct FieldChange {
43 pub field: String,
45 pub from: serde_json::Value,
47 pub to: serde_json::Value,
49}
50
51#[derive(Debug, Clone, PartialEq, Serialize, Deserialize, Hash, Eq)]
53#[serde(tag = "op", rename_all = "snake_case")]
54pub enum Op {
55 Create {
57 uid: Uid,
58 type_name: TypeName,
59 desired: Object,
60 },
61 Update {
63 uid: Uid,
64 type_name: TypeName,
65 desired: Object,
66 changes: Vec<FieldChange>,
67 #[serde(skip_serializing_if = "Option::is_none")]
68 backend_id: Option<BackendId>,
69 },
70 Delete {
72 uid: Uid,
73 type_name: TypeName,
74 key: Key,
75 #[serde(skip_serializing_if = "Option::is_none")]
76 backend_id: Option<BackendId>,
77 },
78}
79
80impl Op {
81 pub fn uid(&self) -> Uid {
83 match self {
84 Op::Create { uid, .. } => *uid,
85 Op::Update { uid, .. } => *uid,
86 Op::Delete { uid, .. } => *uid,
87 }
88 }
89
90 pub fn type_name(&self) -> &TypeName {
92 match self {
93 Op::Create { type_name, .. } => type_name,
94 Op::Update { type_name, .. } => type_name,
95 Op::Delete { type_name, .. } => type_name,
96 }
97 }
98
99 pub fn hashed(&self) -> u64 {
100 let mut hasher = DefaultHasher::new();
101 self.hash(&mut hasher);
102 hasher.finish()
103 }
104}
105
106#[derive(Debug, Clone, Serialize, Deserialize)]
108pub struct Plan {
109 pub schema: Schema,
111 pub ops: Vec<Op>,
113 #[serde(skip_serializing_if = "Option::is_none")]
115 pub summary: Option<PlanSummary>,
116}
117
118#[derive(Debug, Clone, Default, Serialize, Deserialize)]
120pub struct PlanSummary {
121 pub create: usize,
123 pub update: usize,
125 pub delete: usize,
127}
128
129impl Plan {
130 pub fn summary(&self) -> PlanSummary {
132 let mut summary = PlanSummary::default();
133 for op in &self.ops {
134 match op {
135 Op::Create { .. } => summary.create += 1,
136 Op::Update { .. } => summary.update += 1,
137 Op::Delete { .. } => summary.delete += 1,
138 }
139 }
140 summary
141 }
142}
143
144#[derive(Debug, Clone)]
146pub struct ObservedObject {
147 pub type_name: TypeName,
149 pub key: Key,
151 pub attrs: JsonMap,
153 pub backend_id: Option<BackendId>,
155}
156
157#[derive(Debug, Default, Clone)]
159pub struct ObservedState {
160 pub by_backend_id: BTreeMap<(TypeName, BackendId), ObservedObject>,
162 pub by_key: BTreeMap<(TypeName, String), ObservedObject>,
164}
165
166impl ObservedState {
167 pub fn insert(&mut self, object: ObservedObject) -> Result<()> {
170 if let Some(id) = &object.backend_id {
171 let key = (object.type_name.clone(), id.clone());
172 if self.by_backend_id.contains_key(&key) {
173 return Err(anyhow!(
174 "ObservedState already contains an object with backend id {} for type {}",
175 id,
176 object.type_name
177 ));
178 }
179 self.by_backend_id.insert(key, object.clone());
180 }
181
182 let key = (object.type_name.clone(), key_string(&object.key));
183 if self.by_key.contains_key(&key) {
184 return Err(anyhow!(
185 "ObservedState already contains an object with natural key {:?}",
186 key
187 ));
188 }
189 self.by_key.insert(key, object);
190
191 Ok(())
192 }
193}
194
195#[derive(Debug, Clone, Serialize, Deserialize)]
197pub struct AppliedOp {
198 pub uid: Uid,
200 pub type_name: TypeName,
202 #[serde(skip_serializing_if = "Option::is_none")]
203 pub backend_id: Option<BackendId>,
205}
206
207#[derive(Debug, Clone, Default, Serialize, Deserialize)]
209pub struct ApplyReport {
210 pub applied: Vec<AppliedOp>,
212 #[serde(skip_serializing_if = "Option::is_none")]
214 pub previously_applied_count: Option<usize>,
215 #[serde(default)]
217 pub provision: ProvisionReport,
218}
219
220#[derive(Debug, Clone, Default, Serialize, Deserialize)]
222pub struct ProvisionReport {
223 pub created_fields: Vec<String>,
225 pub created_tags: Vec<String>,
227 #[serde(default, skip_serializing_if = "Vec::is_empty")]
229 pub created_object_types: Vec<String>,
230 #[serde(default, skip_serializing_if = "Vec::is_empty")]
232 pub created_object_fields: Vec<String>,
233 #[serde(default, skip_serializing_if = "Vec::is_empty")]
235 pub deprecated_object_types: Vec<String>,
236 #[serde(default, skip_serializing_if = "Vec::is_empty")]
238 pub deprecated_object_fields: Vec<String>,
239 #[serde(default, skip_serializing_if = "Vec::is_empty")]
241 pub deleted_object_types: Vec<String>,
242 #[serde(default, skip_serializing_if = "Vec::is_empty")]
244 pub deleted_object_fields: Vec<String>,
245}
246
247impl ProvisionReport {
248 pub fn is_empty(&self) -> bool {
249 self.created_fields.is_empty()
250 && self.created_tags.is_empty()
251 && self.created_object_types.is_empty()
252 && self.created_object_fields.is_empty()
253 && self.deprecated_object_types.is_empty()
254 && self.deprecated_object_fields.is_empty()
255 && self.deleted_object_types.is_empty()
256 && self.deleted_object_fields.is_empty()
257 }
258}
259
260impl fmt::Display for ProvisionReport {
261 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
262 if self.is_empty() {
263 return write!(f, "no schema changes");
264 }
265
266 let mut first = true;
267 let sections: &[(&str, &[String])] = &[
268 ("fields created", &self.created_fields),
269 ("tags created", &self.created_tags),
270 ("object types created", &self.created_object_types),
271 ("object fields created", &self.created_object_fields),
272 ("object types deprecated", &self.deprecated_object_types),
273 ("object fields deprecated", &self.deprecated_object_fields),
274 ("object types deleted", &self.deleted_object_types),
275 ("object fields deleted", &self.deleted_object_fields),
276 ];
277
278 for (label, items) in sections {
279 if items.is_empty() {
280 continue;
281 }
282 if !first {
283 write!(f, ", ")?;
284 }
285 write!(f, "{} {label}", items.len())?;
286 first = false;
287 }
288
289 Ok(())
290 }
291}
292
293#[async_trait]
295pub trait Observer: Send + Sync {
296 async fn read(
297 &self,
298 schema: &Schema,
299 types: &[TypeName],
300 state: &crate::state::StateStore,
301 ) -> anyhow::Result<ObservedState>;
302}
303
304#[async_trait]
306pub trait Emitter: Send + Sync {
307 async fn write(
308 &self,
309 schema: &Schema,
310 ops: &[Op],
311 state: &crate::state::StateStore,
312 ) -> anyhow::Result<ApplyReport>;
313}
314
315#[async_trait]
317pub trait Adapter: Observer + Emitter {
318 async fn ensure_schema(&self, _schema: &Schema) -> anyhow::Result<ProvisionReport> {
319 Ok(ProvisionReport::default())
320 }
321}
322
323pub enum Backend {
325 Observer(Box<dyn Observer>),
327 Emitter(Box<dyn Emitter>),
329 Adapter(Box<dyn Adapter>),
331}
332
333impl Backend {
334 pub fn observer(&self) -> anyhow::Result<&dyn Observer> {
335 match self {
336 Backend::Observer(observer) => Ok(observer.as_ref()),
337 Backend::Adapter(adapter) => Ok(adapter.as_ref()),
338 Backend::Emitter(_) => Err(anyhow::anyhow!(
339 "backend is write-only; it cannot observe state"
340 )),
341 }
342 }
343
344 pub fn emitter(&self) -> anyhow::Result<&dyn Emitter> {
345 match self {
346 Backend::Emitter(emitter) => Ok(emitter.as_ref()),
347 Backend::Adapter(adapter) => Ok(adapter.as_ref()),
348 Backend::Observer(_) => Err(anyhow::anyhow!(
349 "backend is read-only; it cannot apply changes"
350 )),
351 }
352 }
353
354 pub fn adapter(&self) -> anyhow::Result<&dyn Adapter> {
355 match self {
356 Backend::Adapter(adapter) => Ok(adapter.as_ref()),
357 Backend::Observer(_) => Err(anyhow::anyhow!(
358 "backend is read-only; it cannot provision schema"
359 )),
360 Backend::Emitter(_) => Err(anyhow::anyhow!(
361 "backend is write-only; it cannot provision schema"
362 )),
363 }
364 }
365}
366
367#[cfg(test)]
368mod tests {
369 use super::*;
370 use alembic_core::{Key, TypeName, Uid};
371
372 #[test]
373 fn backend_id_serialization() {
374 let int_id = BackendId::Int(123);
375 let json = serde_json::to_string(&int_id).unwrap();
376 assert_eq!(json, "123");
377 let back: BackendId = serde_json::from_str(&json).unwrap();
378 assert_eq!(back, int_id);
379
380 let str_id = BackendId::String("uuid".to_string());
381 let json = serde_json::to_string(&str_id).unwrap();
382 assert_eq!(json, "\"uuid\"");
383 let back: BackendId = serde_json::from_str(&json).unwrap();
384 assert_eq!(back, str_id);
385 }
386
387 #[test]
388 fn op_helpers() {
389 let uid = Uid::from_u128(1);
390 let type_name = TypeName::new("test.type");
391 let op = Op::Delete {
392 uid,
393 type_name: type_name.clone(),
394 key: Key::default(),
395 backend_id: None,
396 };
397 assert_eq!(op.uid(), uid);
398 assert_eq!(op.type_name(), &type_name);
399 }
400}