Skip to main content

trellis_testing/
resource_ledger.rs

1use std::collections::{BTreeMap, BTreeSet};
2
3use trellis_core::{
4    HostResourceOutcome, ResourceCommand, ResourceCommandTrace, ResourceKey, Revision,
5    TransactionResult,
6};
7
8use crate::host_status::{HostStatusClass, HostStatusEvent, HostStatusIdentity, HostStatusRecord};
9use crate::{ResourceCommandContext, ResourceCommandRecord, ResourceSnapshot};
10
11/// Fake resource lifecycle ledger for applying Trellis resource plans.
12#[derive(Clone, Debug, Eq, PartialEq)]
13pub struct ResourceLedger<C = ()> {
14    pub(crate) resources: BTreeMap<ResourceKey, ResourceSnapshot<C>>,
15    pub(crate) history: BTreeMap<ResourceKey, ResourceSnapshot<C>>,
16    pub(crate) duplicate_closes: Vec<ResourceCommandContext>,
17    pub(crate) forbidden: BTreeSet<ResourceKey>,
18    pub(crate) forbidden_opened: Vec<ResourceCommandContext>,
19    pub(crate) accepted_status: BTreeSet<HostStatusIdentity>,
20    pub(crate) status_records: Vec<HostStatusRecord>,
21    pub(crate) command_trace: Vec<ResourceCommandTrace>,
22    pub(crate) command_records: Vec<ResourceCommandRecord<C>>,
23}
24
25impl<C> Default for ResourceLedger<C> {
26    fn default() -> Self {
27        Self {
28            resources: BTreeMap::new(),
29            history: BTreeMap::new(),
30            duplicate_closes: Vec::new(),
31            forbidden: BTreeSet::new(),
32            forbidden_opened: Vec::new(),
33            accepted_status: BTreeSet::new(),
34            status_records: Vec::new(),
35            command_trace: Vec::new(),
36            command_records: Vec::new(),
37        }
38    }
39}
40
41impl<C> ResourceLedger<C> {
42    /// Creates an empty ledger.
43    pub fn new() -> Self {
44        Self::default()
45    }
46
47    /// Marks a key as forbidden unless the application explicitly permits it.
48    pub fn mark_forbidden_unless_explicit(&mut self, key: ResourceKey) {
49        self.forbidden.insert(key);
50    }
51
52    /// Returns the current snapshot for a resource.
53    pub fn snapshot(&self, key: &ResourceKey) -> Option<&ResourceSnapshot<C>> {
54        self.resources.get(key)
55    }
56
57    /// Returns the latest live or closed snapshot for a resource.
58    pub fn history(&self, key: &ResourceKey) -> Option<&ResourceSnapshot<C>> {
59        self.history.get(key)
60    }
61
62    /// Returns status classifications in delivery order.
63    pub fn status_records(&self) -> &[HostStatusRecord] {
64        &self.status_records
65    }
66
67    /// Returns applied resource command traces in delivery order.
68    pub fn command_trace(&self) -> &[ResourceCommandTrace] {
69        &self.command_trace
70    }
71
72    /// Returns applied command records with transaction/revision context.
73    pub fn command_records(&self) -> &[ResourceCommandRecord<C>] {
74        &self.command_records
75    }
76
77    pub(crate) fn context_for_key(&self, key: &ResourceKey) -> Option<ResourceCommandContext> {
78        self.resources
79            .get(key)
80            .or_else(|| self.history.get(key))
81            .map(ResourceSnapshot::command_context)
82    }
83}
84
85impl<C: Clone> ResourceLedger<C> {
86    /// Applies all resource commands from a transaction result.
87    pub fn apply_result<O>(&mut self, result: &TransactionResult<C, O>) {
88        self.command_trace.extend(result.trace().resource_commands);
89        for command in result.resource_plan.commands() {
90            self.apply_command(command, result.transaction_id, result.revision);
91        }
92    }
93
94    /// Classifies a host status event without mutating graph state.
95    pub fn classify_status(&mut self, status: HostStatusEvent) -> HostStatusClass {
96        let (class, last_transaction_id, last_command_revision) = self.classify_status_ref(&status);
97        if class == HostStatusClass::Current {
98            self.accepted_status
99                .insert(HostStatusIdentity::from(&status));
100            if let Some(snapshot) = self.resources.get_mut(&status.resource_key) {
101                snapshot.last_status_revision = Some(status.status_revision);
102                snapshot.injected_status = Some(status.clone());
103            }
104            self.record_history(&status.resource_key);
105        }
106        self.status_records.push(HostStatusRecord {
107            status,
108            class,
109            last_transaction_id,
110            last_command_revision,
111        });
112        class
113    }
114
115    fn apply_command(
116        &mut self,
117        command: &ResourceCommand<C>,
118        transaction_id: trellis_core::TransactionId,
119        revision: Revision,
120    ) {
121        let generation = self.next_generation(command.key());
122        let record =
123            ResourceCommandRecord::from_command(command, transaction_id, revision, generation);
124        self.command_records.push(record.clone());
125        match command {
126            ResourceCommand::Open { key, scope, .. } => {
127                if self.forbidden.contains(key) {
128                    self.forbidden_opened.push(record.context.clone());
129                }
130                let snapshot = self.ensure_snapshot(key, record);
131                snapshot.owners.insert(*scope);
132                snapshot.is_open = true;
133                snapshot.open_count += 1;
134                self.record_history(key);
135            }
136            ResourceCommand::Close { key, scope } => {
137                let Some(snapshot) = self.resources.get_mut(key) else {
138                    self.duplicate_closes.push(record.context);
139                    return;
140                };
141                if !snapshot.owners.remove(scope) {
142                    self.duplicate_closes.push(record.context.clone());
143                }
144                snapshot.close_count += 1;
145                snapshot.record_command(record);
146                if snapshot.owners.is_empty() {
147                    snapshot.is_open = false;
148                    self.record_history(key);
149                    self.resources.remove(key);
150                } else {
151                    self.record_history(key);
152                }
153            }
154            ResourceCommand::Replace { key, scope, .. } => {
155                let snapshot = self.ensure_snapshot(key, record);
156                snapshot.owners.insert(*scope);
157                snapshot.is_open = true;
158                snapshot.replace_count += 1;
159                self.record_history(key);
160            }
161            ResourceCommand::Refresh { key, .. } => {
162                if let Some(snapshot) = self.resources.get_mut(key) {
163                    snapshot.record_command(record);
164                    self.record_history(key);
165                }
166            }
167        }
168    }
169
170    fn ensure_snapshot(
171        &mut self,
172        key: &ResourceKey,
173        record: ResourceCommandRecord<C>,
174    ) -> &mut ResourceSnapshot<C> {
175        let previous = self.history.get(key).cloned();
176        let snapshot = self
177            .resources
178            .entry(key.clone())
179            .or_insert_with(|| previous.unwrap_or_else(|| ResourceSnapshot::new(record.clone())));
180        snapshot.record_command(record);
181        snapshot
182    }
183
184    fn classify_status_ref(
185        &self,
186        status: &HostStatusEvent,
187    ) -> (
188        HostStatusClass,
189        Option<trellis_core::TransactionId>,
190        Option<Revision>,
191    ) {
192        let known = self.resources.get(&status.resource_key);
193        let historical = known.or_else(|| self.history.get(&status.resource_key));
194        let last_transaction_id = historical.map(|snapshot| snapshot.last_transaction_id);
195        let last_command_revision = historical.map(|snapshot| snapshot.command_revision);
196        let Some(snapshot) = known else {
197            if let Some(snapshot) = historical
198                && matches!(status.status, HostResourceOutcome::Closed)
199                && snapshot.last_command.context.scope == status.scope
200            {
201                if status.command_revision < snapshot.command_revision {
202                    return (
203                        HostStatusClass::Stale,
204                        last_transaction_id,
205                        last_command_revision,
206                    );
207                }
208                if status.command_revision > snapshot.command_revision {
209                    return (
210                        HostStatusClass::Future,
211                        last_transaction_id,
212                        last_command_revision,
213                    );
214                }
215                if self
216                    .accepted_status
217                    .contains(&HostStatusIdentity::from(status))
218                {
219                    return (
220                        HostStatusClass::Duplicate,
221                        last_transaction_id,
222                        last_command_revision,
223                    );
224                }
225                return (
226                    HostStatusClass::Current,
227                    last_transaction_id,
228                    last_command_revision,
229                );
230            }
231            return (
232                HostStatusClass::Late,
233                last_transaction_id,
234                last_command_revision,
235            );
236        };
237        if !snapshot.owners.contains(&status.scope) {
238            return (
239                HostStatusClass::Late,
240                last_transaction_id,
241                last_command_revision,
242            );
243        }
244        if status.command_revision < snapshot.command_revision {
245            return (
246                HostStatusClass::Stale,
247                last_transaction_id,
248                last_command_revision,
249            );
250        }
251        if status.command_revision > snapshot.command_revision {
252            return (
253                HostStatusClass::Future,
254                last_transaction_id,
255                last_command_revision,
256            );
257        }
258        if self
259            .accepted_status
260            .contains(&HostStatusIdentity::from(status))
261        {
262            return (
263                HostStatusClass::Duplicate,
264                last_transaction_id,
265                last_command_revision,
266            );
267        }
268        (
269            HostStatusClass::Current,
270            last_transaction_id,
271            last_command_revision,
272        )
273    }
274
275    fn next_generation(&self, key: &ResourceKey) -> u64 {
276        self.resources
277            .get(key)
278            .or_else(|| self.history.get(key))
279            .map_or(1, |snapshot| snapshot.generation + 1)
280    }
281
282    fn record_history(&mut self, key: &ResourceKey) {
283        if let Some(snapshot) = self.resources.get(key) {
284            self.history.insert(key.clone(), snapshot.clone());
285        }
286    }
287}