1use std::collections::{BTreeMap, BTreeSet};
2
3use trellis_core::{
4 HostResourceCommandState, ResourceCommand, ResourceCommandTrace, ResourceKey, Revision,
5 TransactionResult, classify_host_resource_status,
6};
7
8use crate::host_status::{HostStatusClass, HostStatusEvent, HostStatusIdentity, HostStatusRecord};
9use crate::{ResourceCommandContext, ResourceCommandRecord, ResourceSnapshot};
10
11#[derive(Clone, Debug, Eq, PartialEq)]
13pub struct ResourceLedger<C = ()> {
14 pub(crate) resources: BTreeMap<ResourceKey, ResourceSnapshot<C>>,
15 pub(crate) history: BTreeMap<ResourceKey, ResourceSnapshot<C>>,
16 pub(crate) duplicate_closes: Vec<ResourceCommandContext>,
17 pub(crate) forbidden: BTreeSet<ResourceKey>,
18 pub(crate) forbidden_opened: Vec<ResourceCommandContext>,
19 pub(crate) accepted_status: BTreeSet<HostStatusIdentity>,
20 pub(crate) status_records: Vec<HostStatusRecord>,
21 pub(crate) command_trace: Vec<ResourceCommandTrace>,
22 pub(crate) command_records: Vec<ResourceCommandRecord<C>>,
23}
24
25impl<C> Default for ResourceLedger<C> {
26 fn default() -> Self {
27 Self {
28 resources: BTreeMap::new(),
29 history: BTreeMap::new(),
30 duplicate_closes: Vec::new(),
31 forbidden: BTreeSet::new(),
32 forbidden_opened: Vec::new(),
33 accepted_status: BTreeSet::new(),
34 status_records: Vec::new(),
35 command_trace: Vec::new(),
36 command_records: Vec::new(),
37 }
38 }
39}
40
41impl<C> ResourceLedger<C> {
42 pub fn new() -> Self {
44 Self::default()
45 }
46
47 pub fn mark_forbidden_unless_explicit(&mut self, key: ResourceKey) {
49 self.forbidden.insert(key);
50 }
51
52 pub fn snapshot(&self, key: &ResourceKey) -> Option<&ResourceSnapshot<C>> {
54 self.resources.get(key)
55 }
56
57 pub fn history(&self, key: &ResourceKey) -> Option<&ResourceSnapshot<C>> {
59 self.history.get(key)
60 }
61
62 pub fn status_records(&self) -> &[HostStatusRecord] {
64 &self.status_records
65 }
66
67 pub fn command_trace(&self) -> &[ResourceCommandTrace] {
69 &self.command_trace
70 }
71
72 pub fn command_records(&self) -> &[ResourceCommandRecord<C>] {
74 &self.command_records
75 }
76
77 pub(crate) fn context_for_key(&self, key: &ResourceKey) -> Option<ResourceCommandContext> {
78 self.resources
79 .get(key)
80 .or_else(|| self.history.get(key))
81 .map(ResourceSnapshot::command_context)
82 }
83}
84
85impl<C: Clone> ResourceLedger<C> {
86 pub fn apply_result(&mut self, result: &TransactionResult<C>) {
88 self.command_trace.extend(result.trace().resource_commands);
89 for command in result.resource_plan.commands() {
90 self.apply_command(command, result.transaction_id, result.revision);
91 }
92 }
93
94 pub fn classify_status(&mut self, status: HostStatusEvent) -> HostStatusClass {
96 let (class, last_transaction_id, last_command_revision) = self.classify_status_ref(&status);
97 if class == HostStatusClass::Current {
98 self.accepted_status
99 .insert(HostStatusIdentity::from(&status));
100 if let Some(snapshot) = self.resources.get_mut(&status.resource_key) {
101 snapshot.last_status_revision = Some(status.status_revision);
102 snapshot.injected_status = Some(status.clone());
103 }
104 self.record_history(&status.resource_key);
105 }
106 self.status_records.push(HostStatusRecord {
107 status,
108 class,
109 last_transaction_id,
110 last_command_revision,
111 });
112 class
113 }
114
115 fn apply_command(
116 &mut self,
117 command: &ResourceCommand<C>,
118 transaction_id: trellis_core::TransactionId,
119 revision: Revision,
120 ) {
121 let generation = self.next_generation(command.key());
122 let record =
123 ResourceCommandRecord::from_command(command, transaction_id, revision, generation);
124 self.command_records.push(record.clone());
125 match command {
126 ResourceCommand::Open { key, scope, .. } => {
127 if self.forbidden.contains(key) {
128 self.forbidden_opened.push(record.context.clone());
129 }
130 let snapshot = self.ensure_snapshot(key, record);
131 snapshot.owners.insert(*scope);
132 snapshot.is_open = true;
133 snapshot.open_count += 1;
134 self.record_history(key);
135 }
136 ResourceCommand::Close { key, scope } => {
137 let Some(snapshot) = self.resources.get_mut(key) else {
138 self.duplicate_closes.push(record.context);
139 return;
140 };
141 if !snapshot.owners.remove(scope) {
142 self.duplicate_closes.push(record.context.clone());
143 }
144 snapshot.close_count += 1;
145 snapshot.record_command(record);
146 if snapshot.owners.is_empty() {
147 snapshot.is_open = false;
148 self.record_history(key);
149 self.resources.remove(key);
150 } else {
151 self.record_history(key);
152 }
153 }
154 ResourceCommand::Replace { key, scope, .. } => {
155 let snapshot = self.ensure_snapshot(key, record);
156 snapshot.owners.insert(*scope);
157 snapshot.is_open = true;
158 snapshot.replace_count += 1;
159 self.record_history(key);
160 }
161 ResourceCommand::Refresh { key, .. } => {
162 if let Some(snapshot) = self.resources.get_mut(key) {
163 snapshot.record_command(record);
164 self.record_history(key);
165 }
166 }
167 }
168 }
169
170 fn ensure_snapshot(
171 &mut self,
172 key: &ResourceKey,
173 record: ResourceCommandRecord<C>,
174 ) -> &mut ResourceSnapshot<C> {
175 let previous = self.history.get(key).cloned();
176 let snapshot = self
177 .resources
178 .entry(key.clone())
179 .or_insert_with(|| previous.unwrap_or_else(|| ResourceSnapshot::new(record.clone())));
180 snapshot.record_command(record);
181 snapshot
182 }
183
184 fn classify_status_ref(
185 &self,
186 status: &HostStatusEvent,
187 ) -> (
188 HostStatusClass,
189 Option<trellis_core::TransactionId>,
190 Option<Revision>,
191 ) {
192 let known = self.resources.get(&status.resource_key);
193 let historical = known.or_else(|| self.history.get(&status.resource_key));
194 let last_transaction_id = historical.map(|snapshot| snapshot.last_transaction_id);
195 let last_command_revision = historical.map(|snapshot| snapshot.command_revision);
196 let state = if let Some(snapshot) = known {
197 Some(HostResourceCommandState {
198 scope: snapshot.last_command.context.scope,
199 command_revision: snapshot.command_revision,
200 resource_is_live: true,
201 scope_owns_resource: snapshot.owners.contains(&status.scope),
202 })
203 } else {
204 historical.map(|snapshot| HostResourceCommandState {
205 scope: snapshot.last_command.context.scope,
206 command_revision: snapshot.command_revision,
207 resource_is_live: false,
208 scope_owns_resource: false,
209 })
210 };
211 let duplicate = self
212 .accepted_status
213 .contains(&HostStatusIdentity::from(status));
214 (
215 classify_host_resource_status(status, state, duplicate),
216 last_transaction_id,
217 last_command_revision,
218 )
219 }
220
221 fn next_generation(&self, key: &ResourceKey) -> u64 {
222 self.resources
223 .get(key)
224 .or_else(|| self.history.get(key))
225 .map_or(1, |snapshot| snapshot.generation + 1)
226 }
227
228 fn record_history(&mut self, key: &ResourceKey) {
229 if let Some(snapshot) = self.resources.get(key) {
230 self.history.insert(key.clone(), snapshot.clone());
231 }
232 }
233}