1use std::collections::{BTreeMap, BTreeSet};
2
3use trellis_core::{
4 HostResourceCommandState, ResourceCoalescedTrace, ResourceCommand, ResourceCommandTrace,
5 ResourceKey, Revision, TransactionResult, classify_host_resource_status,
6};
7
8use crate::host_status::{HostStatusClass, HostStatusEvent, HostStatusIdentity, HostStatusRecord};
9use crate::{ResourceCommandContext, ResourceCommandRecord, ResourceSnapshot};
10
11#[derive(Clone, Debug, Eq, PartialEq)]
13pub struct ResourceLedger<C = ()> {
14 pub(crate) resources: BTreeMap<ResourceKey, ResourceSnapshot<C>>,
15 pub(crate) history: BTreeMap<ResourceKey, ResourceSnapshot<C>>,
16 pub(crate) duplicate_closes: Vec<ResourceCommandContext>,
17 pub(crate) forbidden: BTreeSet<ResourceKey>,
18 pub(crate) forbidden_opened: Vec<ResourceCommandContext>,
19 pub(crate) accepted_status: BTreeSet<HostStatusIdentity>,
20 pub(crate) status_records: Vec<HostStatusRecord>,
21 pub(crate) command_trace: Vec<ResourceCommandTrace>,
22 pub(crate) command_records: Vec<ResourceCommandRecord<C>>,
23 pub(crate) unexplained_coalescences: Vec<ResourceCoalescedTrace>,
24}
25
26impl<C> Default for ResourceLedger<C> {
27 fn default() -> Self {
28 Self {
29 resources: BTreeMap::new(),
30 history: BTreeMap::new(),
31 duplicate_closes: Vec::new(),
32 forbidden: BTreeSet::new(),
33 forbidden_opened: Vec::new(),
34 accepted_status: BTreeSet::new(),
35 status_records: Vec::new(),
36 command_trace: Vec::new(),
37 command_records: Vec::new(),
38 unexplained_coalescences: Vec::new(),
39 }
40 }
41}
42
43impl<C> ResourceLedger<C> {
44 pub fn new() -> Self {
46 Self::default()
47 }
48
49 pub fn mark_forbidden_unless_explicit(&mut self, key: ResourceKey) {
51 self.forbidden.insert(key);
52 }
53
54 pub fn snapshot(&self, key: &ResourceKey) -> Option<&ResourceSnapshot<C>> {
56 self.resources.get(key)
57 }
58
59 pub fn history(&self, key: &ResourceKey) -> Option<&ResourceSnapshot<C>> {
61 self.history.get(key)
62 }
63
64 pub fn status_records(&self) -> &[HostStatusRecord] {
66 &self.status_records
67 }
68
69 pub fn command_trace(&self) -> &[ResourceCommandTrace] {
71 &self.command_trace
72 }
73
74 pub fn command_records(&self) -> &[ResourceCommandRecord<C>] {
76 &self.command_records
77 }
78
79 pub(crate) fn context_for_key(&self, key: &ResourceKey) -> Option<ResourceCommandContext> {
80 self.resources
81 .get(key)
82 .or_else(|| self.history.get(key))
83 .map(ResourceSnapshot::command_context)
84 }
85}
86
87impl<C: Clone> ResourceLedger<C> {
88 pub fn apply_result(&mut self, result: &TransactionResult<C>) {
90 self.command_trace.extend(result.trace().resource_commands);
91 for command in result.resource_plan.commands() {
92 self.apply_command(command, result.transaction_id, result.revision);
93 }
94 for coalesced in &result.resource_coalescences {
95 self.apply_coalescence(coalesced);
96 }
97 }
98
99 fn apply_coalescence(&mut self, coalesced: &ResourceCoalescedTrace) {
100 let inserted = if let Some(snapshot) = self.resources.get_mut(&coalesced.key) {
101 snapshot.owners.insert(coalesced.scope)
102 } else {
103 false
104 };
105 if inserted {
106 self.record_history(&coalesced.key);
107 } else {
108 self.unexplained_coalescences.push(coalesced.clone());
109 }
110 }
111
112 pub fn classify_status(&mut self, status: HostStatusEvent) -> HostStatusClass {
114 let (class, last_transaction_id, last_command_revision) = self.classify_status_ref(&status);
115 if class == HostStatusClass::Current {
116 self.accepted_status
117 .insert(HostStatusIdentity::from(&status));
118 if let Some(snapshot) = self.resources.get_mut(&status.resource_key) {
119 snapshot.last_status_revision = Some(status.status_revision);
120 snapshot.injected_status = Some(status.clone());
121 }
122 self.record_history(&status.resource_key);
123 }
124 self.status_records.push(HostStatusRecord {
125 status,
126 class,
127 last_transaction_id,
128 last_command_revision,
129 });
130 class
131 }
132
133 fn apply_command(
134 &mut self,
135 command: &ResourceCommand<C>,
136 transaction_id: trellis_core::TransactionId,
137 revision: Revision,
138 ) {
139 let generation = self.next_generation(command.key());
140 let record =
141 ResourceCommandRecord::from_command(command, transaction_id, revision, generation);
142 self.command_records.push(record.clone());
143 match command {
144 ResourceCommand::Open { key, scope, .. } => {
145 if self.forbidden.contains(key) {
146 self.forbidden_opened.push(record.context.clone());
147 }
148 let snapshot = self.ensure_snapshot(key, record);
149 snapshot.owners.insert(*scope);
150 snapshot.is_open = true;
151 snapshot.open_count += 1;
152 self.record_history(key);
153 }
154 ResourceCommand::Close { key, scope } => {
155 let Some(snapshot) = self.resources.get_mut(key) else {
156 self.duplicate_closes.push(record.context);
157 return;
158 };
159 if !snapshot.owners.remove(scope) {
160 self.duplicate_closes.push(record.context.clone());
161 }
162 snapshot.close_count += 1;
163 snapshot.record_command(record);
164 if snapshot.owners.is_empty() {
165 snapshot.is_open = false;
166 self.record_history(key);
167 self.resources.remove(key);
168 } else {
169 self.record_history(key);
170 }
171 }
172 ResourceCommand::Replace { key, scope, .. } => {
173 let snapshot = self.ensure_snapshot(key, record);
174 snapshot.owners.insert(*scope);
175 snapshot.is_open = true;
176 snapshot.replace_count += 1;
177 self.record_history(key);
178 }
179 ResourceCommand::Refresh { key, .. } => {
180 if let Some(snapshot) = self.resources.get_mut(key) {
181 snapshot.record_command(record);
182 self.record_history(key);
183 }
184 }
185 }
186 }
187
188 fn ensure_snapshot(
189 &mut self,
190 key: &ResourceKey,
191 record: ResourceCommandRecord<C>,
192 ) -> &mut ResourceSnapshot<C> {
193 let previous = self.history.get(key).cloned();
194 let snapshot = self
195 .resources
196 .entry(key.clone())
197 .or_insert_with(|| previous.unwrap_or_else(|| ResourceSnapshot::new(record.clone())));
198 snapshot.record_command(record);
199 snapshot
200 }
201
202 fn classify_status_ref(
203 &self,
204 status: &HostStatusEvent,
205 ) -> (
206 HostStatusClass,
207 Option<trellis_core::TransactionId>,
208 Option<Revision>,
209 ) {
210 let known = self.resources.get(&status.resource_key);
211 let historical = known.or_else(|| self.history.get(&status.resource_key));
212 let last_transaction_id = historical.map(|snapshot| snapshot.last_transaction_id);
213 let last_command_revision = historical.map(|snapshot| snapshot.command_revision);
214 let state = if let Some(snapshot) = known {
215 Some(HostResourceCommandState {
216 scope: snapshot.last_command.context.scope,
217 command_revision: snapshot.command_revision,
218 resource_is_live: true,
219 scope_owns_resource: snapshot.owners.contains(&status.scope),
220 })
221 } else {
222 historical.map(|snapshot| HostResourceCommandState {
223 scope: snapshot.last_command.context.scope,
224 command_revision: snapshot.command_revision,
225 resource_is_live: false,
226 scope_owns_resource: false,
227 })
228 };
229 let duplicate = self
230 .accepted_status
231 .contains(&HostStatusIdentity::from(status));
232 (
233 classify_host_resource_status(status, state, duplicate),
234 last_transaction_id,
235 last_command_revision,
236 )
237 }
238
239 fn next_generation(&self, key: &ResourceKey) -> u64 {
240 self.resources
241 .get(key)
242 .or_else(|| self.history.get(key))
243 .map_or(1, |snapshot| snapshot.generation + 1)
244 }
245
246 fn record_history(&mut self, key: &ResourceKey) {
247 if let Some(snapshot) = self.resources.get(key) {
248 self.history.insert(key.clone(), snapshot.clone());
249 }
250 }
251}