Skip to main content

trellis_core/
resource_reconcile.rs

1use crate::{
2    Graph, GraphError, GraphResult, ResourceCoalescedTrace, ResourceCommand, ResourceCommandCause,
3    ResourceCommandKind, ResourceKey, ResourcePayloadConflict, ResourcePlan, ScopeId,
4};
5use std::collections::BTreeSet;
6
7impl<C: Clone + PartialEq> Graph<C> {
8    pub(crate) fn produce_resource_plan(
9        &mut self,
10        closed_scopes: &[ScopeId],
11    ) -> GraphResult<ResourcePlan<C>> {
12        self.audit.pending_resource_causes.clear();
13        self.audit.pending_resource_coalescences.clear();
14        let planners = self.resource_planners.clone();
15        let mut plan = ResourcePlan::new();
16        for planner in planners {
17            if self.collection_diffs.contains_key(&planner.collection) {
18                let planned = planner.run(self)?;
19                let cause = ResourceCommandCause::Planner {
20                    collection: planner.collection,
21                };
22                let reconciled = self.reconcile_resource_plan(planner.scope, planned, cause)?;
23                plan.append(reconciled);
24            }
25        }
26
27        for scope in closed_scopes {
28            let close_plan = self.close_scope_resources(*scope);
29            plan.append(close_plan);
30        }
31
32        Ok(plan)
33    }
34
35    fn reconcile_resource_plan(
36        &mut self,
37        planner_scope: ScopeId,
38        plan: ResourcePlan<C>,
39        cause: ResourceCommandCause,
40    ) -> GraphResult<ResourcePlan<C>> {
41        let mut reconciled = ResourcePlan::new();
42        for command in plan.into_commands() {
43            if command.scope() != planner_scope {
44                return Err(GraphError::ResourceScopeMismatch(command.scope()));
45            }
46            self.reconcile_resource_command(command, &mut reconciled, cause)?;
47        }
48        Ok(reconciled)
49    }
50
51    fn reconcile_resource_command(
52        &mut self,
53        command: ResourceCommand<C>,
54        plan: &mut ResourcePlan<C>,
55        cause: ResourceCommandCause,
56    ) -> GraphResult<()> {
57        match command {
58            ResourceCommand::Open {
59                key,
60                scope,
61                command,
62            } => self.reconcile_open(key, scope, command, plan, cause),
63            ResourceCommand::Close { key, scope } => {
64                self.remove_resource_owner(&key, scope, plan, cause);
65                Ok(())
66            }
67            ResourceCommand::Replace {
68                key,
69                scope,
70                command,
71            } => {
72                self.require_scope_open(scope)?;
73                self.require_resource_owner(&key, scope, ResourceCommandKind::Replace)?;
74                self.resource_owners
75                    .entry(key.clone())
76                    .or_default()
77                    .insert(scope);
78                self.resource_payloads.insert(key.clone(), command.clone());
79                plan.replace(key, scope, command);
80                self.audit.pending_resource_causes.push(cause);
81                Ok(())
82            }
83            ResourceCommand::Refresh {
84                key,
85                scope,
86                command,
87            } => {
88                self.require_scope_open(scope)?;
89                self.require_resource_owner(&key, scope, ResourceCommandKind::Refresh)?;
90                self.resource_owners
91                    .entry(key.clone())
92                    .or_default()
93                    .insert(scope);
94                self.resource_payloads.insert(key.clone(), command.clone());
95                plan.refresh(key, scope, command);
96                self.audit.pending_resource_causes.push(cause);
97                Ok(())
98            }
99        }
100    }
101
102    fn reconcile_open(
103        &mut self,
104        key: ResourceKey,
105        scope: ScopeId,
106        command: C,
107        plan: &mut ResourcePlan<C>,
108        cause: ResourceCommandCause,
109    ) -> GraphResult<()> {
110        self.require_scope_open(scope)?;
111        let existing_payload = self.resource_payloads.get(&key);
112        let existing_owners = self.resource_owners.get(&key);
113        let was_empty = existing_owners.is_none_or(BTreeSet::is_empty);
114        let already_owned = existing_owners.is_some_and(|owners| owners.contains(&scope));
115        if let Some(existing_payload) = existing_payload
116            && existing_payload != &command
117        {
118            return Err(GraphError::ResourcePayloadConflict(
119                ResourcePayloadConflict {
120                    key,
121                    joining_scope: scope,
122                    existing_owners: existing_owners
123                        .into_iter()
124                        .flat_map(|owners| owners.iter().copied())
125                        .collect(),
126                },
127            ));
128        }
129        let existing_owner_count = existing_owners.map_or(0, BTreeSet::len);
130        let owners = self.resource_owners.entry(key.clone()).or_default();
131        owners.insert(scope);
132        if was_empty {
133            self.resource_payloads.insert(key.clone(), command.clone());
134            self.record_resource_acquisition(scope, &key);
135            plan.open(key, scope, command);
136            self.audit.pending_resource_causes.push(cause);
137        } else if !already_owned {
138            self.record_resource_acquisition(scope, &key);
139            self.audit
140                .pending_resource_coalescences
141                .push(ResourceCoalescedTrace {
142                    key,
143                    scope,
144                    existing_owner_count,
145                });
146        }
147        Ok(())
148    }
149
150    fn close_scope_resources(&mut self, scope: ScopeId) -> ResourcePlan<C> {
151        let mut keys: Vec<(u64, ResourceKey)> = self
152            .resource_acquisitions
153            .iter()
154            .filter(|((owner_scope, _), _)| *owner_scope == scope)
155            .map(|((_, key), sequence)| (*sequence, key.clone()))
156            .collect();
157        keys.sort_by(|left, right| right.0.cmp(&left.0).then_with(|| left.1.cmp(&right.1)));
158        let mut plan = ResourcePlan::new();
159        let cause = ResourceCommandCause::ScopeClosed { scope };
160        for (_, key) in keys {
161            self.remove_resource_owner(&key, scope, &mut plan, cause);
162        }
163        plan
164    }
165
166    fn remove_resource_owner(
167        &mut self,
168        key: &ResourceKey,
169        scope: ScopeId,
170        plan: &mut ResourcePlan<C>,
171        cause: ResourceCommandCause,
172    ) {
173        let Some(owners) = self.resource_owners.get_mut(key) else {
174            return;
175        };
176        owners.remove(&scope);
177        self.resource_acquisitions.remove(&(scope, key.clone()));
178        if owners.is_empty() {
179            self.resource_owners.remove(key);
180            self.resource_payloads.remove(key);
181            plan.close(key.clone(), scope);
182            self.audit.pending_resource_causes.push(cause);
183        }
184    }
185
186    fn require_resource_owner(
187        &self,
188        key: &ResourceKey,
189        scope: ScopeId,
190        command_kind: ResourceCommandKind,
191    ) -> GraphResult<()> {
192        let Some(owners) = self.resource_owners.get(key) else {
193            return Err(GraphError::ResourceNotOwned {
194                key: key.clone(),
195                scope,
196                command_kind,
197            });
198        };
199        if !owners.contains(&scope) {
200            return Err(GraphError::ResourceNotOwned {
201                key: key.clone(),
202                scope,
203                command_kind,
204            });
205        }
206        Ok(())
207    }
208}
209
210impl<C> Graph<C> {
211    pub(crate) fn require_scope_open(&self, scope: ScopeId) -> GraphResult<()> {
212        let scope_meta = self
213            .scope_meta(scope)
214            .ok_or(GraphError::UnknownScope(scope))?;
215        if scope_meta.is_closed() {
216            return Err(GraphError::ScopeAlreadyClosed(scope));
217        }
218        Ok(())
219    }
220
221    /// Returns resource owners in deterministic resource-key order.
222    pub fn resource_owners(&self, key: &ResourceKey) -> Option<&BTreeSet<ScopeId>> {
223        self.resource_owners.get(key)
224    }
225
226    pub(crate) fn take_pending_resource_coalescences(&mut self) -> Vec<ResourceCoalescedTrace> {
227        std::mem::take(&mut self.audit.pending_resource_coalescences)
228    }
229
230    fn record_resource_acquisition(&mut self, scope: ScopeId, key: &ResourceKey) {
231        let entry = (scope, key.clone());
232        if !self.resource_acquisitions.contains_key(&entry) {
233            let sequence = self.next_resource_acquisition;
234            self.next_resource_acquisition += 1;
235            self.resource_acquisitions.insert(entry, sequence);
236        }
237    }
238}