Skip to main content

dk_protocol/
watch.rs

1use tokio::sync::{broadcast, mpsc};
2use tonic::Status;
3
4use crate::server::ProtocolServer;
5use crate::{WatchEvent, WatchRequest};
6
7/// Long-running handler for the WATCH server-streaming RPC.
8///
9/// Subscribes to the shared [`EventBus`] and forwards every event to the
10/// client via the provided `mpsc::Sender`.  The loop terminates when:
11///
12/// * The client disconnects (send fails).
13/// * The event bus is dropped (channel closed).
14///
15/// Lagged receivers (slow consumers) log a warning and continue.
16pub async fn handle_watch(
17    server: &ProtocolServer,
18    req: WatchRequest,
19    tx: mpsc::Sender<Result<WatchEvent, Status>>,
20) {
21    let session = match server.validate_session(&req.session_id) {
22        Ok(s) => s,
23        Err(e) => {
24            let _ = tx.send(Err(e)).await;
25            return;
26        }
27    };
28
29    // Resolve repo_id: prefer the one from the request, fall back to
30    // resolving it from the session's codebase.
31    let repo_id = if !req.repo_id.is_empty() {
32        req.repo_id.clone()
33    } else {
34        // Try to resolve repo_id from the session's codebase name.
35        match server.engine().get_repo(&session.codebase).await {
36            Ok((rid, _git_repo)) => rid.to_string(),
37            Err(_) => {
38                // Fall back to subscribe_all if we can't resolve the repo.
39                String::new()
40            }
41        }
42    };
43
44    // Subscribe to the repo-specific channel, or all events if no repo resolved.
45    let mut rx = if repo_id.is_empty() {
46        server.event_bus().subscribe_all()
47    } else {
48        server.event_bus().subscribe(&repo_id)
49    };
50
51    let filter = &req.filter;
52    let self_session_id = req.session_id.clone();
53
54    loop {
55        match rx.recv().await {
56            Ok(event) => {
57                // Filter out events from the requesting session itself.
58                if event.session_id == self_session_id {
59                    continue;
60                }
61
62                if matches_filter(&event.event_type, filter)
63                    && tx.send(Ok(event)).await.is_err()
64                {
65                    break;
66                }
67            }
68            Err(broadcast::error::RecvError::Lagged(n)) => {
69                tracing::warn!("watch stream lagged by {} events", n);
70            }
71            Err(broadcast::error::RecvError::Closed) => break,
72        }
73    }
74}
75
76/// Check if an event matches a glob-style filter.
77///
78/// Supported patterns:
79/// - Empty or "*" matches everything
80/// - "changeset.*" matches "changeset.submitted", "changeset.merged", etc.
81/// - "*.merged" matches "changeset.merged", "branch.merged", etc.
82/// - Exact match: "changeset.submitted" matches only that event type
83fn matches_filter(event_type: &str, filter: &str) -> bool {
84    if filter.is_empty() || filter == "*" {
85        return true;
86    }
87
88    if let Some(prefix) = filter.strip_suffix(".*") {
89        event_type.starts_with(prefix)
90            && event_type.as_bytes().get(prefix.len()) == Some(&b'.')
91    } else if let Some(suffix) = filter.strip_prefix("*.") {
92        event_type.ends_with(suffix)
93            && event_type.len() > suffix.len()
94            && event_type.as_bytes()[event_type.len() - suffix.len() - 1] == b'.'
95    } else {
96        event_type == filter
97    }
98}
99
100#[cfg(test)]
101mod tests {
102    use super::*;
103
104    #[test]
105    fn empty_filter_matches_all() {
106        assert!(matches_filter("changeset.submitted", ""));
107        assert!(matches_filter("anything", ""));
108    }
109
110    #[test]
111    fn star_matches_all() {
112        assert!(matches_filter("changeset.submitted", "*"));
113        assert!(matches_filter("anything", "*"));
114    }
115
116    #[test]
117    fn prefix_glob() {
118        assert!(matches_filter("changeset.submitted", "changeset.*"));
119        assert!(matches_filter("changeset.merged", "changeset.*"));
120        assert!(matches_filter("changeset.verified", "changeset.*"));
121        assert!(!matches_filter("branch.created", "changeset.*"));
122        assert!(!matches_filter("changesetx.foo", "changeset.*"));
123    }
124
125    #[test]
126    fn suffix_glob() {
127        assert!(matches_filter("changeset.merged", "*.merged"));
128        assert!(matches_filter("branch.merged", "*.merged"));
129        assert!(!matches_filter("changeset.submitted", "*.merged"));
130        assert!(!matches_filter("xmerged", "*.merged"));
131    }
132
133    #[test]
134    fn exact_match() {
135        assert!(matches_filter("changeset.submitted", "changeset.submitted"));
136        assert!(!matches_filter("changeset.merged", "changeset.submitted"));
137    }
138}