1use crate::{
2 Graph, GraphError, GraphResult, ResourceCoalescedTrace, ResourceCommand, ResourceCommandCause,
3 ResourceCommandKind, ResourceKey, ResourcePayloadConflict, ResourcePlan, ScopeId,
4};
5use std::collections::BTreeSet;
6
7impl<C: Clone + PartialEq> Graph<C> {
8 pub(crate) fn produce_resource_plan(
9 &mut self,
10 closed_scopes: &[ScopeId],
11 ) -> GraphResult<ResourcePlan<C>> {
12 self.audit.pending_resource_causes.clear();
13 self.audit.pending_resource_coalescences.clear();
14 let planners = self.resource_planners.clone();
15 let mut plan = ResourcePlan::new();
16 for planner in planners {
17 if self.collection_diffs.contains_key(&planner.collection) {
18 let planned = planner.run(self)?;
19 let cause = ResourceCommandCause::Planner {
20 collection: planner.collection,
21 };
22 let reconciled = self.reconcile_resource_plan(planner.scope, planned, cause)?;
23 plan.append(reconciled);
24 }
25 }
26
27 for scope in closed_scopes {
28 let close_plan = self.close_scope_resources(*scope);
29 plan.append(close_plan);
30 }
31
32 Ok(plan)
33 }
34
35 fn reconcile_resource_plan(
36 &mut self,
37 planner_scope: ScopeId,
38 plan: ResourcePlan<C>,
39 cause: ResourceCommandCause,
40 ) -> GraphResult<ResourcePlan<C>> {
41 let mut reconciled = ResourcePlan::new();
42 for command in plan.into_commands() {
43 if command.scope() != planner_scope {
44 return Err(GraphError::ResourceScopeMismatch(command.scope()));
45 }
46 self.reconcile_resource_command(command, &mut reconciled, cause)?;
47 }
48 Ok(reconciled)
49 }
50
51 fn reconcile_resource_command(
52 &mut self,
53 command: ResourceCommand<C>,
54 plan: &mut ResourcePlan<C>,
55 cause: ResourceCommandCause,
56 ) -> GraphResult<()> {
57 match command {
58 ResourceCommand::Open {
59 key,
60 scope,
61 command,
62 } => self.reconcile_open(key, scope, command, plan, cause),
63 ResourceCommand::Close { key, scope } => {
64 self.remove_resource_owner(&key, scope, plan, cause);
65 Ok(())
66 }
67 ResourceCommand::Replace {
68 key,
69 scope,
70 command,
71 } => {
72 self.require_scope_open(scope)?;
73 self.require_resource_owner(&key, scope, ResourceCommandKind::Replace)?;
74 self.resource_owners
75 .entry(key.clone())
76 .or_default()
77 .insert(scope);
78 self.resource_payloads.insert(key.clone(), command.clone());
79 plan.replace(key, scope, command);
80 self.audit.pending_resource_causes.push(cause);
81 Ok(())
82 }
83 ResourceCommand::Refresh {
84 key,
85 scope,
86 command,
87 } => {
88 self.require_scope_open(scope)?;
89 self.require_resource_owner(&key, scope, ResourceCommandKind::Refresh)?;
90 self.resource_owners
91 .entry(key.clone())
92 .or_default()
93 .insert(scope);
94 self.resource_payloads.insert(key.clone(), command.clone());
95 plan.refresh(key, scope, command);
96 self.audit.pending_resource_causes.push(cause);
97 Ok(())
98 }
99 }
100 }
101
102 fn reconcile_open(
103 &mut self,
104 key: ResourceKey,
105 scope: ScopeId,
106 command: C,
107 plan: &mut ResourcePlan<C>,
108 cause: ResourceCommandCause,
109 ) -> GraphResult<()> {
110 self.require_scope_open(scope)?;
111 let existing_payload = self.resource_payloads.get(&key);
112 let existing_owners = self.resource_owners.get(&key);
113 let was_empty = existing_owners.is_none_or(BTreeSet::is_empty);
114 let already_owned = existing_owners.is_some_and(|owners| owners.contains(&scope));
115 if let Some(existing_payload) = existing_payload
116 && existing_payload != &command
117 {
118 return Err(GraphError::ResourcePayloadConflict(
119 ResourcePayloadConflict {
120 key,
121 joining_scope: scope,
122 existing_owners: existing_owners
123 .into_iter()
124 .flat_map(|owners| owners.iter().copied())
125 .collect(),
126 },
127 ));
128 }
129 let existing_owner_count = existing_owners.map_or(0, BTreeSet::len);
130 let owners = self.resource_owners.entry(key.clone()).or_default();
131 owners.insert(scope);
132 if was_empty {
133 self.resource_payloads.insert(key.clone(), command.clone());
134 self.record_resource_acquisition(scope, &key);
135 plan.open(key, scope, command);
136 self.audit.pending_resource_causes.push(cause);
137 } else if !already_owned {
138 self.record_resource_acquisition(scope, &key);
139 self.audit
140 .pending_resource_coalescences
141 .push(ResourceCoalescedTrace {
142 key,
143 scope,
144 existing_owner_count,
145 });
146 }
147 Ok(())
148 }
149
150 fn close_scope_resources(&mut self, scope: ScopeId) -> ResourcePlan<C> {
151 let mut keys: Vec<(u64, ResourceKey)> = self
152 .resource_acquisitions
153 .iter()
154 .filter(|((owner_scope, _), _)| *owner_scope == scope)
155 .map(|((_, key), sequence)| (*sequence, key.clone()))
156 .collect();
157 keys.sort_by(|left, right| right.0.cmp(&left.0).then_with(|| left.1.cmp(&right.1)));
158 let mut plan = ResourcePlan::new();
159 let cause = ResourceCommandCause::ScopeClosed { scope };
160 for (_, key) in keys {
161 self.remove_resource_owner(&key, scope, &mut plan, cause);
162 }
163 plan
164 }
165
166 fn remove_resource_owner(
167 &mut self,
168 key: &ResourceKey,
169 scope: ScopeId,
170 plan: &mut ResourcePlan<C>,
171 cause: ResourceCommandCause,
172 ) {
173 let Some(owners) = self.resource_owners.get_mut(key) else {
174 return;
175 };
176 owners.remove(&scope);
177 self.resource_acquisitions.remove(&(scope, key.clone()));
178 if owners.is_empty() {
179 self.resource_owners.remove(key);
180 self.resource_payloads.remove(key);
181 plan.close(key.clone(), scope);
182 self.audit.pending_resource_causes.push(cause);
183 }
184 }
185
186 fn require_resource_owner(
187 &self,
188 key: &ResourceKey,
189 scope: ScopeId,
190 command_kind: ResourceCommandKind,
191 ) -> GraphResult<()> {
192 let Some(owners) = self.resource_owners.get(key) else {
193 return Err(GraphError::ResourceNotOwned {
194 key: key.clone(),
195 scope,
196 command_kind,
197 });
198 };
199 if !owners.contains(&scope) {
200 return Err(GraphError::ResourceNotOwned {
201 key: key.clone(),
202 scope,
203 command_kind,
204 });
205 }
206 Ok(())
207 }
208}
209
210impl<C> Graph<C> {
211 pub(crate) fn require_scope_open(&self, scope: ScopeId) -> GraphResult<()> {
212 let scope_meta = self
213 .scope_meta(scope)
214 .ok_or(GraphError::UnknownScope(scope))?;
215 if scope_meta.is_closed() {
216 return Err(GraphError::ScopeAlreadyClosed(scope));
217 }
218 Ok(())
219 }
220
221 pub fn resource_owners(&self, key: &ResourceKey) -> Option<&BTreeSet<ScopeId>> {
223 self.resource_owners.get(key)
224 }
225
226 pub(crate) fn take_pending_resource_coalescences(&mut self) -> Vec<ResourceCoalescedTrace> {
227 std::mem::take(&mut self.audit.pending_resource_coalescences)
228 }
229
230 fn record_resource_acquisition(&mut self, scope: ScopeId, key: &ResourceKey) {
231 let entry = (scope, key.clone());
232 if !self.resource_acquisitions.contains_key(&entry) {
233 let sequence = self.next_resource_acquisition;
234 self.next_resource_acquisition += 1;
235 self.resource_acquisitions.insert(entry, sequence);
236 }
237 }
238}