Skip to main content

palimpsest_dataflow/palimpsest/
shared.rs

1//! Shared subgraph reference tracking.
2
3use std::collections::BTreeMap;
4
5use serde::{Deserialize, Serialize};
6
7/// Stable identifier for a shared dataflow subgraph.
8#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
9pub struct SharedSubgraphId(u64);
10
11impl SharedSubgraphId {
12    /// Creates a shared subgraph identifier.
13    #[must_use]
14    pub const fn new(value: u64) -> Self {
15        Self(value)
16    }
17
18    /// Returns the raw identifier.
19    #[must_use]
20    pub const fn get(self) -> u64 {
21        self.0
22    }
23}
24
25/// Result of acquiring a shared subgraph.
26#[derive(Debug, Clone, PartialEq, Eq)]
27pub enum SharedSubgraphAcquire {
28    /// The registry created a new subgraph slot for this key.
29    Created {
30        /// Shared subgraph identifier.
31        id: SharedSubgraphId,
32    },
33    /// The registry reused an existing subgraph slot.
34    Reused {
35        /// Shared subgraph identifier.
36        id: SharedSubgraphId,
37        /// New reference count after the acquire.
38        ref_count: usize,
39    },
40}
41
42impl SharedSubgraphAcquire {
43    /// Returns the acquired shared subgraph identifier.
44    #[must_use]
45    pub const fn id(&self) -> SharedSubgraphId {
46        match self {
47            Self::Created { id } | Self::Reused { id, .. } => *id,
48        }
49    }
50}
51
52/// Result of releasing a shared subgraph reference.
53#[derive(Debug, Clone, PartialEq, Eq)]
54pub enum SharedSubgraphRelease {
55    /// Other subscribers still hold references.
56    StillReferenced {
57        /// Shared subgraph identifier.
58        id: SharedSubgraphId,
59        /// Remaining reference count.
60        ref_count: usize,
61    },
62    /// The last subscriber was released and the runtime should tear down the subgraph.
63    Teardown {
64        /// Shared subgraph identifier.
65        id: SharedSubgraphId,
66        /// Canonical key associated with the subgraph.
67        key: String,
68    },
69}
70
71/// Reference-counted registry for canonical shared subgraphs.
72#[derive(Debug, Clone, Default, PartialEq, Eq)]
73pub struct SharedSubgraphRegistry {
74    next_id: u64,
75    ids_by_key: BTreeMap<String, SharedSubgraphId>,
76    entries: BTreeMap<SharedSubgraphId, SharedSubgraphEntry>,
77}
78
79impl SharedSubgraphRegistry {
80    /// Creates an empty shared subgraph registry.
81    #[must_use]
82    pub const fn new() -> Self {
83        Self {
84            next_id: 0,
85            ids_by_key: BTreeMap::new(),
86            entries: BTreeMap::new(),
87        }
88    }
89
90    /// Acquires a reference to the canonical subgraph for `key`.
91    pub fn acquire(&mut self, key: impl Into<String>) -> SharedSubgraphAcquire {
92        let key = key.into();
93        if let Some(id) = self.ids_by_key.get(&key).copied() {
94            let entry = self
95                .entries
96                .get_mut(&id)
97                .expect("shared subgraph key index points at a missing entry");
98            entry.ref_count = entry.ref_count.saturating_add(1);
99            return SharedSubgraphAcquire::Reused {
100                id,
101                ref_count: entry.ref_count,
102            };
103        }
104
105        let id = SharedSubgraphId(self.next_id);
106        self.next_id = self.next_id.saturating_add(1);
107        self.ids_by_key.insert(key.clone(), id);
108        self.entries
109            .insert(id, SharedSubgraphEntry { key, ref_count: 1 });
110        SharedSubgraphAcquire::Created { id }
111    }
112
113    /// Releases one reference and reports whether teardown is now required.
114    pub fn release(&mut self, id: SharedSubgraphId) -> Option<SharedSubgraphRelease> {
115        let entry = self.entries.get_mut(&id)?;
116        entry.ref_count = entry.ref_count.saturating_sub(1);
117
118        if entry.ref_count > 0 {
119            return Some(SharedSubgraphRelease::StillReferenced {
120                id,
121                ref_count: entry.ref_count,
122            });
123        }
124
125        let entry = self.entries.remove(&id)?;
126        self.ids_by_key.remove(&entry.key);
127        Some(SharedSubgraphRelease::Teardown { id, key: entry.key })
128    }
129
130    /// Resolves a canonical key to an active shared subgraph identifier.
131    #[must_use]
132    pub fn resolve(&self, key: &str) -> Option<SharedSubgraphId> {
133        self.ids_by_key.get(key).copied()
134    }
135
136    /// Returns the active reference count for `id`.
137    #[must_use]
138    pub fn ref_count(&self, id: SharedSubgraphId) -> Option<usize> {
139        self.entries.get(&id).map(|entry| entry.ref_count)
140    }
141
142    /// Returns the number of active shared subgraphs.
143    #[must_use]
144    pub fn len(&self) -> usize {
145        self.entries.len()
146    }
147
148    /// Returns true when no shared subgraphs are active.
149    #[must_use]
150    pub fn is_empty(&self) -> bool {
151        self.entries.is_empty()
152    }
153}
154
155#[derive(Debug, Clone, PartialEq, Eq)]
156struct SharedSubgraphEntry {
157    key: String,
158    ref_count: usize,
159}
160
161#[cfg(test)]
162mod tests {
163    use super::{
164        SharedSubgraphAcquire, SharedSubgraphId, SharedSubgraphRegistry, SharedSubgraphRelease,
165    };
166
167    #[test]
168    fn acquire_reuses_existing_subgraph_by_key() {
169        let mut registry = SharedSubgraphRegistry::new();
170        let first = registry.acquire("canonical:posts-by-author");
171        let id = first.id();
172
173        assert_eq!(first, SharedSubgraphAcquire::Created { id });
174        assert_eq!(
175            registry.acquire("canonical:posts-by-author"),
176            SharedSubgraphAcquire::Reused { id, ref_count: 2 }
177        );
178        assert_eq!(registry.resolve("canonical:posts-by-author"), Some(id));
179        assert_eq!(registry.ref_count(id), Some(2));
180        assert_eq!(registry.len(), 1);
181    }
182
183    #[test]
184    fn release_reports_teardown_on_last_reference() {
185        let mut registry = SharedSubgraphRegistry::new();
186        let id = registry.acquire("canonical:recent-posts").id();
187        registry.acquire("canonical:recent-posts");
188
189        assert_eq!(
190            registry.release(id),
191            Some(SharedSubgraphRelease::StillReferenced { id, ref_count: 1 })
192        );
193        assert_eq!(
194            registry.release(id),
195            Some(SharedSubgraphRelease::Teardown {
196                id,
197                key: "canonical:recent-posts".to_owned(),
198            })
199        );
200        assert_eq!(registry.resolve("canonical:recent-posts"), None);
201        assert!(registry.is_empty());
202    }
203
204    #[test]
205    fn release_unknown_subgraph_is_noop() {
206        let mut registry = SharedSubgraphRegistry::new();
207
208        assert_eq!(registry.release(SharedSubgraphId::new(99)), None);
209    }
210}