palimpsest_dataflow/palimpsest/
shared.rs1use std::collections::BTreeMap;
4
5use serde::{Deserialize, Serialize};
6
7#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
9pub struct SharedSubgraphId(u64);
10
11impl SharedSubgraphId {
12 #[must_use]
14 pub const fn new(value: u64) -> Self {
15 Self(value)
16 }
17
18 #[must_use]
20 pub const fn get(self) -> u64 {
21 self.0
22 }
23}
24
25#[derive(Debug, Clone, PartialEq, Eq)]
27pub enum SharedSubgraphAcquire {
28 Created {
30 id: SharedSubgraphId,
32 },
33 Reused {
35 id: SharedSubgraphId,
37 ref_count: usize,
39 },
40}
41
42impl SharedSubgraphAcquire {
43 #[must_use]
45 pub const fn id(&self) -> SharedSubgraphId {
46 match self {
47 Self::Created { id } | Self::Reused { id, .. } => *id,
48 }
49 }
50}
51
52#[derive(Debug, Clone, PartialEq, Eq)]
54pub enum SharedSubgraphRelease {
55 StillReferenced {
57 id: SharedSubgraphId,
59 ref_count: usize,
61 },
62 Teardown {
64 id: SharedSubgraphId,
66 key: String,
68 },
69}
70
71#[derive(Debug, Clone, Default, PartialEq, Eq)]
73pub struct SharedSubgraphRegistry {
74 next_id: u64,
75 ids_by_key: BTreeMap<String, SharedSubgraphId>,
76 entries: BTreeMap<SharedSubgraphId, SharedSubgraphEntry>,
77}
78
79impl SharedSubgraphRegistry {
80 #[must_use]
82 pub const fn new() -> Self {
83 Self {
84 next_id: 0,
85 ids_by_key: BTreeMap::new(),
86 entries: BTreeMap::new(),
87 }
88 }
89
90 pub fn acquire(&mut self, key: impl Into<String>) -> SharedSubgraphAcquire {
92 let key = key.into();
93 if let Some(id) = self.ids_by_key.get(&key).copied() {
94 let entry = self
95 .entries
96 .get_mut(&id)
97 .expect("shared subgraph key index points at a missing entry");
98 entry.ref_count = entry.ref_count.saturating_add(1);
99 return SharedSubgraphAcquire::Reused {
100 id,
101 ref_count: entry.ref_count,
102 };
103 }
104
105 let id = SharedSubgraphId(self.next_id);
106 self.next_id = self.next_id.saturating_add(1);
107 self.ids_by_key.insert(key.clone(), id);
108 self.entries
109 .insert(id, SharedSubgraphEntry { key, ref_count: 1 });
110 SharedSubgraphAcquire::Created { id }
111 }
112
113 pub fn release(&mut self, id: SharedSubgraphId) -> Option<SharedSubgraphRelease> {
115 let entry = self.entries.get_mut(&id)?;
116 entry.ref_count = entry.ref_count.saturating_sub(1);
117
118 if entry.ref_count > 0 {
119 return Some(SharedSubgraphRelease::StillReferenced {
120 id,
121 ref_count: entry.ref_count,
122 });
123 }
124
125 let entry = self.entries.remove(&id)?;
126 self.ids_by_key.remove(&entry.key);
127 Some(SharedSubgraphRelease::Teardown { id, key: entry.key })
128 }
129
130 #[must_use]
132 pub fn resolve(&self, key: &str) -> Option<SharedSubgraphId> {
133 self.ids_by_key.get(key).copied()
134 }
135
136 #[must_use]
138 pub fn ref_count(&self, id: SharedSubgraphId) -> Option<usize> {
139 self.entries.get(&id).map(|entry| entry.ref_count)
140 }
141
142 #[must_use]
144 pub fn len(&self) -> usize {
145 self.entries.len()
146 }
147
148 #[must_use]
150 pub fn is_empty(&self) -> bool {
151 self.entries.is_empty()
152 }
153}
154
155#[derive(Debug, Clone, PartialEq, Eq)]
156struct SharedSubgraphEntry {
157 key: String,
158 ref_count: usize,
159}
160
161#[cfg(test)]
162mod tests {
163 use super::{
164 SharedSubgraphAcquire, SharedSubgraphId, SharedSubgraphRegistry, SharedSubgraphRelease,
165 };
166
167 #[test]
168 fn acquire_reuses_existing_subgraph_by_key() {
169 let mut registry = SharedSubgraphRegistry::new();
170 let first = registry.acquire("canonical:posts-by-author");
171 let id = first.id();
172
173 assert_eq!(first, SharedSubgraphAcquire::Created { id });
174 assert_eq!(
175 registry.acquire("canonical:posts-by-author"),
176 SharedSubgraphAcquire::Reused { id, ref_count: 2 }
177 );
178 assert_eq!(registry.resolve("canonical:posts-by-author"), Some(id));
179 assert_eq!(registry.ref_count(id), Some(2));
180 assert_eq!(registry.len(), 1);
181 }
182
183 #[test]
184 fn release_reports_teardown_on_last_reference() {
185 let mut registry = SharedSubgraphRegistry::new();
186 let id = registry.acquire("canonical:recent-posts").id();
187 registry.acquire("canonical:recent-posts");
188
189 assert_eq!(
190 registry.release(id),
191 Some(SharedSubgraphRelease::StillReferenced { id, ref_count: 1 })
192 );
193 assert_eq!(
194 registry.release(id),
195 Some(SharedSubgraphRelease::Teardown {
196 id,
197 key: "canonical:recent-posts".to_owned(),
198 })
199 );
200 assert_eq!(registry.resolve("canonical:recent-posts"), None);
201 assert!(registry.is_empty());
202 }
203
204 #[test]
205 fn release_unknown_subgraph_is_noop() {
206 let mut registry = SharedSubgraphRegistry::new();
207
208 assert_eq!(registry.release(SharedSubgraphId::new(99)), None);
209 }
210}