Skip to main content

task_graph_mcp/
subscriptions.rs

1//! MCP resource subscription manager.
2//!
3//! Tracks which resource URIs have been subscribed to by the connected MCP
4//! client. When a tool call mutates state that corresponds to a subscribed
5//! resource, the server sends a `notifications/resources/updated` notification
6//! to the client so it can re-fetch the resource.
7//!
8//! Because MCP stdio transport is single-client, we only need to track one
9//! peer's subscriptions. The manager stores the set of subscribed URIs and
10//! provides a method to determine which URIs should be notified after a
11//! particular category of mutation.
12
13use std::collections::HashSet;
14use std::sync::Mutex;
15
16/// Categories of mutations that affect resources.
17/// When a tool call completes, it reports which categories of data changed,
18/// and the SubscriptionManager maps those to affected resource URIs.
19#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
20pub enum MutationKind {
21    /// A task was created, updated, deleted, or had its status changed.
22    TaskChanged,
23    /// A dependency link was created, removed, or changed.
24    DependencyChanged,
25    /// A file mark was added or removed.
26    FileMarkChanged,
27    /// An agent connected, disconnected, or was cleaned up.
28    AgentChanged,
29    /// An attachment was added or removed.
30    AttachmentChanged,
31}
32
33impl MutationKind {
34    /// Return the set of resource URIs that are potentially affected by this
35    /// kind of mutation.
36    pub fn affected_uris(&self) -> &'static [&'static str] {
37        match self {
38            MutationKind::TaskChanged => &[
39                "query://tasks/all",
40                "query://tasks/ready",
41                "query://tasks/blocked",
42                "query://tasks/claimed",
43                "query://stats/summary",
44            ],
45            MutationKind::DependencyChanged => &[
46                "query://tasks/all",
47                "query://tasks/ready",
48                "query://tasks/blocked",
49                "query://stats/summary",
50            ],
51            MutationKind::FileMarkChanged => &["query://files/marks"],
52            MutationKind::AgentChanged => &[
53                "query://agents/all",
54                "query://tasks/claimed",
55                "query://stats/summary",
56            ],
57            MutationKind::AttachmentChanged => &["query://tasks/all", "query://stats/summary"],
58        }
59    }
60}
61
62/// Manages resource subscriptions for the connected MCP client.
63///
64/// Thread-safe: uses an internal `Mutex` so it can be shared across async
65/// tasks without requiring `&mut self`.
66pub struct SubscriptionManager {
67    /// Set of resource URIs the client has subscribed to.
68    subscribed: Mutex<HashSet<String>>,
69}
70
71impl SubscriptionManager {
72    /// Create a new empty subscription manager.
73    pub fn new() -> Self {
74        Self {
75            subscribed: Mutex::new(HashSet::new()),
76        }
77    }
78
79    /// Subscribe to a resource URI. Returns `true` if newly added.
80    pub fn subscribe(&self, uri: &str) -> bool {
81        let mut set = self.subscribed.lock().unwrap();
82        set.insert(uri.to_string())
83    }
84
85    /// Unsubscribe from a resource URI. Returns `true` if was present.
86    pub fn unsubscribe(&self, uri: &str) -> bool {
87        let mut set = self.subscribed.lock().unwrap();
88        set.remove(uri)
89    }
90
91    /// Check if any subscriptions are registered.
92    pub fn has_subscriptions(&self) -> bool {
93        let set = self.subscribed.lock().unwrap();
94        !set.is_empty()
95    }
96
97    /// Given a set of mutation kinds, return the subscribed URIs that need
98    /// notification. Only returns URIs that the client has actually subscribed to.
99    pub fn affected_subscriptions(&self, mutations: &[MutationKind]) -> Vec<String> {
100        let set = self.subscribed.lock().unwrap();
101        if set.is_empty() {
102            return Vec::new();
103        }
104
105        let mut result = HashSet::new();
106        for kind in mutations {
107            for uri in kind.affected_uris() {
108                if set.contains(*uri) {
109                    result.insert((*uri).to_string());
110                }
111            }
112        }
113        result.into_iter().collect()
114    }
115}
116
117impl Default for SubscriptionManager {
118    fn default() -> Self {
119        Self::new()
120    }
121}
122
123#[cfg(test)]
124mod tests {
125    use super::*;
126
127    #[test]
128    fn test_subscribe_unsubscribe() {
129        let mgr = SubscriptionManager::new();
130        assert!(!mgr.has_subscriptions());
131
132        // Subscribe
133        assert!(mgr.subscribe("query://tasks/all"));
134        assert!(mgr.has_subscriptions());
135
136        // Duplicate subscribe returns false
137        assert!(!mgr.subscribe("query://tasks/all"));
138
139        // Unsubscribe
140        assert!(mgr.unsubscribe("query://tasks/all"));
141        assert!(!mgr.has_subscriptions());
142
143        // Unsubscribe missing returns false
144        assert!(!mgr.unsubscribe("query://tasks/all"));
145    }
146
147    #[test]
148    fn test_affected_subscriptions() {
149        let mgr = SubscriptionManager::new();
150        mgr.subscribe("query://tasks/all");
151        mgr.subscribe("query://files/marks");
152
153        // TaskChanged should include query://tasks/all but not query://files/marks
154        let affected = mgr.affected_subscriptions(&[MutationKind::TaskChanged]);
155        assert!(affected.contains(&"query://tasks/all".to_string()));
156        assert!(!affected.contains(&"query://files/marks".to_string()));
157
158        // FileMarkChanged should include query://files/marks
159        let affected = mgr.affected_subscriptions(&[MutationKind::FileMarkChanged]);
160        assert!(affected.contains(&"query://files/marks".to_string()));
161        assert!(!affected.contains(&"query://tasks/all".to_string()));
162
163        // Combined mutations
164        let affected =
165            mgr.affected_subscriptions(&[MutationKind::TaskChanged, MutationKind::FileMarkChanged]);
166        assert!(affected.contains(&"query://tasks/all".to_string()));
167        assert!(affected.contains(&"query://files/marks".to_string()));
168    }
169
170    #[test]
171    fn test_no_subscriptions_returns_empty() {
172        let mgr = SubscriptionManager::new();
173        let affected = mgr.affected_subscriptions(&[MutationKind::TaskChanged]);
174        assert!(affected.is_empty());
175    }
176
177    #[test]
178    fn test_unsubscribed_uri_not_notified() {
179        let mgr = SubscriptionManager::new();
180        // Subscribe only to query://files/marks, not query://tasks/all
181        mgr.subscribe("query://files/marks");
182
183        let affected = mgr.affected_subscriptions(&[MutationKind::TaskChanged]);
184        assert!(affected.is_empty()); // query://tasks/all is not subscribed
185    }
186}