Skip to main content

trellis_testing/
resource_ledger.rs

1use std::collections::{BTreeMap, BTreeSet};
2
3use trellis_core::{
4    HostResourceCommandState, ResourceCoalescedTrace, ResourceCommand, ResourceCommandTrace,
5    ResourceKey, Revision, TransactionResult, classify_host_resource_status,
6};
7
8use crate::host_status::{HostStatusClass, HostStatusEvent, HostStatusIdentity, HostStatusRecord};
9use crate::{ResourceCommandContext, ResourceCommandRecord, ResourceSnapshot};
10
11/// Fake resource lifecycle ledger for applying Trellis resource plans.
12#[derive(Clone, Debug, Eq, PartialEq)]
13pub struct ResourceLedger<C = ()> {
14    pub(crate) resources: BTreeMap<ResourceKey, ResourceSnapshot<C>>,
15    pub(crate) history: BTreeMap<ResourceKey, ResourceSnapshot<C>>,
16    pub(crate) duplicate_closes: Vec<ResourceCommandContext>,
17    pub(crate) forbidden: BTreeSet<ResourceKey>,
18    pub(crate) forbidden_opened: Vec<ResourceCommandContext>,
19    pub(crate) accepted_status: BTreeSet<HostStatusIdentity>,
20    pub(crate) status_records: Vec<HostStatusRecord>,
21    pub(crate) command_trace: Vec<ResourceCommandTrace>,
22    pub(crate) command_records: Vec<ResourceCommandRecord<C>>,
23    pub(crate) unexplained_coalescences: Vec<ResourceCoalescedTrace>,
24}
25
26impl<C> Default for ResourceLedger<C> {
27    fn default() -> Self {
28        Self {
29            resources: BTreeMap::new(),
30            history: BTreeMap::new(),
31            duplicate_closes: Vec::new(),
32            forbidden: BTreeSet::new(),
33            forbidden_opened: Vec::new(),
34            accepted_status: BTreeSet::new(),
35            status_records: Vec::new(),
36            command_trace: Vec::new(),
37            command_records: Vec::new(),
38            unexplained_coalescences: Vec::new(),
39        }
40    }
41}
42
43impl<C> ResourceLedger<C> {
44    /// Creates an empty ledger.
45    pub fn new() -> Self {
46        Self::default()
47    }
48
49    /// Marks a key as forbidden unless the application explicitly permits it.
50    pub fn mark_forbidden_unless_explicit(&mut self, key: ResourceKey) {
51        self.forbidden.insert(key);
52    }
53
54    /// Returns the current snapshot for a resource.
55    pub fn snapshot(&self, key: &ResourceKey) -> Option<&ResourceSnapshot<C>> {
56        self.resources.get(key)
57    }
58
59    /// Returns the latest live or closed snapshot for a resource.
60    pub fn history(&self, key: &ResourceKey) -> Option<&ResourceSnapshot<C>> {
61        self.history.get(key)
62    }
63
64    /// Returns status classifications in delivery order.
65    pub fn status_records(&self) -> &[HostStatusRecord] {
66        &self.status_records
67    }
68
69    /// Returns applied resource command traces in delivery order.
70    pub fn command_trace(&self) -> &[ResourceCommandTrace] {
71        &self.command_trace
72    }
73
74    /// Returns applied command records with transaction/revision context.
75    pub fn command_records(&self) -> &[ResourceCommandRecord<C>] {
76        &self.command_records
77    }
78
79    pub(crate) fn context_for_key(&self, key: &ResourceKey) -> Option<ResourceCommandContext> {
80        self.resources
81            .get(key)
82            .or_else(|| self.history.get(key))
83            .map(ResourceSnapshot::command_context)
84    }
85}
86
87impl<C: Clone> ResourceLedger<C> {
88    /// Applies all resource commands from a transaction result.
89    pub fn apply_result(&mut self, result: &TransactionResult<C>) {
90        self.command_trace.extend(result.trace().resource_commands);
91        for command in result.resource_plan.commands() {
92            self.apply_command(command, result.transaction_id, result.revision);
93        }
94        for coalesced in &result.resource_coalescences {
95            self.apply_coalescence(coalesced);
96        }
97    }
98
99    fn apply_coalescence(&mut self, coalesced: &ResourceCoalescedTrace) {
100        let inserted = if let Some(snapshot) = self.resources.get_mut(&coalesced.key) {
101            snapshot.owners.insert(coalesced.scope)
102        } else {
103            false
104        };
105        if inserted {
106            self.record_history(&coalesced.key);
107        } else {
108            self.unexplained_coalescences.push(coalesced.clone());
109        }
110    }
111
112    /// Classifies a host status event without mutating graph state.
113    pub fn classify_status(&mut self, status: HostStatusEvent) -> HostStatusClass {
114        let (class, last_transaction_id, last_command_revision) = self.classify_status_ref(&status);
115        if class == HostStatusClass::Current {
116            self.accepted_status
117                .insert(HostStatusIdentity::from(&status));
118            if let Some(snapshot) = self.resources.get_mut(&status.resource_key) {
119                snapshot.last_status_revision = Some(status.status_revision);
120                snapshot.injected_status = Some(status.clone());
121            }
122            self.record_history(&status.resource_key);
123        }
124        self.status_records.push(HostStatusRecord {
125            status,
126            class,
127            last_transaction_id,
128            last_command_revision,
129        });
130        class
131    }
132
133    fn apply_command(
134        &mut self,
135        command: &ResourceCommand<C>,
136        transaction_id: trellis_core::TransactionId,
137        revision: Revision,
138    ) {
139        let generation = self.next_generation(command.key());
140        let record =
141            ResourceCommandRecord::from_command(command, transaction_id, revision, generation);
142        self.command_records.push(record.clone());
143        match command {
144            ResourceCommand::Open { key, scope, .. } => {
145                if self.forbidden.contains(key) {
146                    self.forbidden_opened.push(record.context.clone());
147                }
148                let snapshot = self.ensure_snapshot(key, record);
149                snapshot.owners.insert(*scope);
150                snapshot.is_open = true;
151                snapshot.open_count += 1;
152                self.record_history(key);
153            }
154            ResourceCommand::Close { key, scope } => {
155                let Some(snapshot) = self.resources.get_mut(key) else {
156                    self.duplicate_closes.push(record.context);
157                    return;
158                };
159                if !snapshot.owners.remove(scope) {
160                    self.duplicate_closes.push(record.context.clone());
161                }
162                snapshot.close_count += 1;
163                snapshot.record_command(record);
164                if snapshot.owners.is_empty() {
165                    snapshot.is_open = false;
166                    self.record_history(key);
167                    self.resources.remove(key);
168                } else {
169                    self.record_history(key);
170                }
171            }
172            ResourceCommand::Replace { key, scope, .. } => {
173                let snapshot = self.ensure_snapshot(key, record);
174                snapshot.owners.insert(*scope);
175                snapshot.is_open = true;
176                snapshot.replace_count += 1;
177                self.record_history(key);
178            }
179            ResourceCommand::Refresh { key, .. } => {
180                if let Some(snapshot) = self.resources.get_mut(key) {
181                    snapshot.record_command(record);
182                    self.record_history(key);
183                }
184            }
185        }
186    }
187
188    fn ensure_snapshot(
189        &mut self,
190        key: &ResourceKey,
191        record: ResourceCommandRecord<C>,
192    ) -> &mut ResourceSnapshot<C> {
193        let previous = self.history.get(key).cloned();
194        let snapshot = self
195            .resources
196            .entry(key.clone())
197            .or_insert_with(|| previous.unwrap_or_else(|| ResourceSnapshot::new(record.clone())));
198        snapshot.record_command(record);
199        snapshot
200    }
201
202    fn classify_status_ref(
203        &self,
204        status: &HostStatusEvent,
205    ) -> (
206        HostStatusClass,
207        Option<trellis_core::TransactionId>,
208        Option<Revision>,
209    ) {
210        let known = self.resources.get(&status.resource_key);
211        let historical = known.or_else(|| self.history.get(&status.resource_key));
212        let last_transaction_id = historical.map(|snapshot| snapshot.last_transaction_id);
213        let last_command_revision = historical.map(|snapshot| snapshot.command_revision);
214        let state = if let Some(snapshot) = known {
215            Some(HostResourceCommandState {
216                scope: snapshot.last_command.context.scope,
217                command_revision: snapshot.command_revision,
218                resource_is_live: true,
219                scope_owns_resource: snapshot.owners.contains(&status.scope),
220            })
221        } else {
222            historical.map(|snapshot| HostResourceCommandState {
223                scope: snapshot.last_command.context.scope,
224                command_revision: snapshot.command_revision,
225                resource_is_live: false,
226                scope_owns_resource: false,
227            })
228        };
229        let duplicate = self
230            .accepted_status
231            .contains(&HostStatusIdentity::from(status));
232        (
233            classify_host_resource_status(status, state, duplicate),
234            last_transaction_id,
235            last_command_revision,
236        )
237    }
238
239    fn next_generation(&self, key: &ResourceKey) -> u64 {
240        self.resources
241            .get(key)
242            .or_else(|| self.history.get(key))
243            .map_or(1, |snapshot| snapshot.generation + 1)
244    }
245
246    fn record_history(&mut self, key: &ResourceKey) {
247        if let Some(snapshot) = self.resources.get(key) {
248            self.history.insert(key.clone(), snapshot.clone());
249        }
250    }
251}