Skip to main content

dirtydata_core/
patch.rs

1//! Patch Engine — the heart of DirtyData.
2
3use serde::{Deserialize, Serialize};
4use std::collections::BTreeMap;
5
6use crate::hash;
7use crate::ir::{Edge, Graph, Node};
8use crate::types::*;
9
10/// A single atomic patch — the unit of change.
11#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
12pub struct Patch {
13    pub identity: PatchId,
14    pub operations: Vec<Operation>,
15    pub intent_ref: Option<IntentId>,
16    pub deterministic_hash: Hash,
17    pub parents: Vec<PatchId>,
18    pub parent_hashes: Vec<Hash>,
19    pub timestamp: Timestamp,
20    pub source: PatchSource,
21    pub trust: TrustLevel,
22}
23
24/// Atomic operations on the Canonical IR.
25#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
26pub enum Operation {
27    AddNode(Node),
28    RemoveNode(StableId),
29    ReplaceNode(Node),
30    ModifyConfig { node_id: StableId, delta: ConfigDelta },
31    AddEdge(Edge),
32    RemoveEdge(StableId),
33    ModifyEdge { edge_id: StableId, delta: EdgeDelta },
34    AddModulation(crate::ir::Modulation),
35    RemoveModulation(StableId),
36}
37
38#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
39pub struct PatchSet {
40    pub patches: Vec<Patch>,
41}
42
43#[derive(Debug, Clone, thiserror::Error)]
44pub enum PatchError {
45    #[error("node {0} not found")] NodeNotFound(StableId),
46    #[error("edge {0} not found")] EdgeNotFound(StableId),
47    #[error("node {0} already exists")] NodeAlreadyExists(StableId),
48    #[error("edge {0} already exists")] EdgeAlreadyExists(StableId),
49    #[error("port '{port}' not found on node {node}")] PortNotFound { node: StableId, port: String },
50    #[error("hash mismatch: expected {expected}, got {actual}")] HashMismatch { expected: String, actual: String },
51    #[error("merge conflict: {0}")] MergeConflict(String),
52}
53
54impl Patch {
55    pub fn from_operations(operations: Vec<Operation>) -> Self {
56        let mut patch = Self {
57            identity: PatchId::new(),
58            operations,
59            intent_ref: None,
60            deterministic_hash: [0u8; 32],
61            parents: Vec::new(),
62            parent_hashes: Vec::new(),
63            timestamp: Timestamp::now(),
64            source: PatchSource::System,
65            trust: TrustLevel::Trusted,
66        };
67        patch.deterministic_hash = hash::hash_patch(&patch);
68        patch
69    }
70
71    pub fn from_operations_with_provenance(operations: Vec<Operation>, source: PatchSource, trust: TrustLevel) -> Self {
72        let mut patch = Self {
73            identity: PatchId::new(),
74            operations,
75            intent_ref: None,
76            deterministic_hash: [0u8; 32],
77            parents: Vec::new(),
78            parent_hashes: Vec::new(),
79            timestamp: Timestamp::now(),
80            source,
81            trust,
82        };
83        patch.deterministic_hash = hash::hash_patch(&patch);
84        patch
85    }
86
87    pub fn with_intent(mut self, intent_id: IntentId) -> Self {
88        self.intent_ref = Some(intent_id);
89        self.deterministic_hash = hash::hash_patch(&self);
90        self
91    }
92
93    pub fn with_parents(mut self, parents: Vec<(PatchId, Hash)>) -> Self {
94        self.parents = parents.iter().map(|(id, _)| *id).collect();
95        self.parent_hashes = parents.iter().map(|(_, h)| *h).collect();
96        self.deterministic_hash = hash::hash_patch(&self);
97        self
98    }
99
100    pub fn verify_hash(&self) -> bool {
101        hash::hash_patch(self) == self.deterministic_hash
102    }
103}
104
105impl Graph {
106    pub fn apply_patch(&mut self, patch: &Patch) -> Result<(), PatchError> {
107        for op in &patch.operations {
108            self.apply_operation(op)?;
109        }
110        self.revision = self.revision.next();
111        self.lineage.applied_patches.push(patch.identity);
112        self.lineage.history.insert(patch.identity, patch.clone());
113        self.sync();
114        Ok(())
115    }
116
117    fn apply_operation(&mut self, op: &Operation) -> Result<(), PatchError> {
118        match op {
119            Operation::AddNode(node) => {
120                if self.topology.nodes.contains_key(&node.id) { return Err(PatchError::NodeAlreadyExists(node.id)); }
121                self.topology.nodes.insert(node.id, node.clone());
122            }
123            Operation::RemoveNode(id) => {
124                if self.topology.nodes.remove(id).is_none() { return Err(PatchError::NodeNotFound(*id)); }
125                self.topology.edges.retain(|_, e| e.source.node_id != *id && e.target.node_id != *id);
126            }
127            Operation::ReplaceNode(node) => {
128                if !self.topology.nodes.contains_key(&node.id) { return Err(PatchError::NodeNotFound(node.id)); }
129                self.topology.nodes.insert(node.id, node.clone());
130            }
131            Operation::ModifyConfig { node_id, delta } => {
132                let node = self.topology.nodes.get_mut(node_id).ok_or(PatchError::NodeNotFound(*node_id))?;
133                for (key, change) in delta {
134                    match &change.new {
135                        Some(val) => { node.config.insert(key.clone(), val.clone()); }
136                        None => { node.config.remove(key); }
137                    }
138                }
139            }
140            Operation::AddEdge(edge) => {
141                if self.topology.edges.contains_key(&edge.id) { return Err(PatchError::EdgeAlreadyExists(edge.id)); }
142                self.require_port(&edge.source)?;
143                self.require_port(&edge.target)?;
144                self.topology.edges.insert(edge.id, edge.clone());
145            }
146            Operation::RemoveEdge(id) => {
147                if self.topology.edges.remove(id).is_none() { return Err(PatchError::EdgeNotFound(*id)); }
148            }
149            Operation::ModifyEdge { edge_id, delta } => {
150                if let Some(ref src) = delta.source { self.require_port(src)?; }
151                if let Some(ref tgt) = delta.target { self.require_port(tgt)?; }
152                let edge = self.topology.edges.get_mut(edge_id).ok_or(PatchError::EdgeNotFound(*edge_id))?;
153                if let Some(ref src) = delta.source { edge.source = src.clone(); }
154                if let Some(ref tgt) = delta.target { edge.target = tgt.clone(); }
155                if let Some(k) = delta.kind { edge.kind = k; }
156            }
157            Operation::AddModulation(m) => {
158                if self.topology.modulations.contains_key(&m.id) { return Err(PatchError::EdgeAlreadyExists(m.id)); }
159                self.require_port(&m.source)?;
160                if !self.topology.nodes.contains_key(&m.target_node) { return Err(PatchError::NodeNotFound(m.target_node)); }
161                self.topology.modulations.insert(m.id, m.clone());
162            }
163            Operation::RemoveModulation(id) => {
164                if self.topology.modulations.remove(id).is_none() { return Err(PatchError::EdgeNotFound(*id)); }
165            }
166        }
167        Ok(())
168    }
169
170    fn require_port(&self, port_ref: &PortRef) -> Result<(), PatchError> {
171        let node = self.topology.nodes.get(&port_ref.node_id).ok_or(PatchError::NodeNotFound(port_ref.node_id))?;
172        if !node.ports.iter().any(|p| p.name == port_ref.port_name) {
173            return Err(PatchError::PortNotFound { node: port_ref.node_id, port: port_ref.port_name.clone() });
174        }
175        Ok(())
176    }
177
178    pub fn diff(&self, other: &Graph) -> PatchSet {
179        let mut operations = Vec::new();
180        for id in self.topology.nodes.keys() {
181            if !other.topology.nodes.contains_key(id) { operations.push(Operation::RemoveNode(*id)); }
182        }
183        for (id, node) in &other.topology.nodes {
184            match self.topology.nodes.get(id) {
185                None => operations.push(Operation::AddNode(node.clone())),
186                Some(old) => {
187                    if old.config != node.config {
188                        let delta = config_diff(&old.config, &node.config);
189                        if !delta.is_empty() { operations.push(Operation::ModifyConfig { node_id: *id, delta }); }
190                    }
191                }
192            }
193        }
194        for id in self.topology.edges.keys() {
195            if !other.topology.edges.contains_key(id) { operations.push(Operation::RemoveEdge(*id)); }
196        }
197        for (id, edge) in &other.topology.edges {
198            match self.topology.edges.get(id) {
199                None => operations.push(Operation::AddEdge(edge.clone())),
200                Some(old) => {
201                    if old != edge {
202                        let delta = EdgeDelta {
203                            source: if old.source != edge.source { Some(edge.source.clone()) } else { None },
204                            target: if old.target != edge.target { Some(edge.target.clone()) } else { None },
205                            kind: if old.kind != edge.kind { Some(edge.kind) } else { None },
206                        };
207                        operations.push(Operation::ModifyEdge { edge_id: *id, delta });
208                    }
209                }
210            }
211        }
212        PatchSet { patches: vec![Patch::from_operations(operations)] }
213    }
214
215    pub fn replay(patches: &[Patch]) -> Result<Self, PatchError> {
216        let mut graph = Graph::new();
217        for patch in patches { graph.apply_patch(patch)?; }
218        Ok(graph)
219    }
220
221    pub fn replay_and_verify(patches: &[Patch], expected_hash: &Hash) -> Result<Self, PatchError> {
222        let graph = Self::replay(patches)?;
223        let actual_hash = hash::hash_graph(&graph);
224        if &actual_hash != expected_hash {
225            return Err(PatchError::HashMismatch { expected: hex::encode(expected_hash), actual: hex::encode(&actual_hash) });
226        }
227        Ok(graph)
228    }
229}
230
231impl PatchSet {
232    pub fn new() -> Self { Self { patches: Vec::new() } }
233    pub fn single(patch: Patch) -> Self { Self { patches: vec![patch] } }
234    pub fn merge(&self, other: &PatchSet) -> Result<PatchSet, PatchError> {
235        let mut merged = self.patches.clone();
236        merged.extend(other.patches.iter().cloned());
237        Ok(PatchSet { patches: merged })
238    }
239    pub fn is_empty(&self) -> bool { self.patches.is_empty() }
240    pub fn len(&self) -> usize { self.patches.len() }
241}
242
243impl Default for PatchSet { fn default() -> Self { Self::new() } }
244
245pub fn config_diff(old: &ConfigSnapshot, new: &ConfigSnapshot) -> ConfigDelta {
246    let mut delta = BTreeMap::new();
247    for (key, old_val) in old {
248        match new.get(key) {
249            Some(new_val) if old_val != new_val => {
250                delta.insert(key.clone(), ConfigChange { old: Some(old_val.clone()), new: Some(new_val.clone()) });
251            }
252            None => {
253                delta.insert(key.clone(), ConfigChange { old: Some(old_val.clone()), new: None });
254            }
255            _ => {}
256        }
257    }
258    for (key, new_val) in new {
259        if !old.contains_key(key) {
260            delta.insert(key.clone(), ConfigChange { old: None, new: Some(new_val.clone()) });
261        }
262    }
263    delta
264}
265
266mod hex {
267    pub fn encode(bytes: &[u8]) -> String { bytes.iter().map(|b| format!("{:02x}", b)).collect() }
268}