1use tokio::sync::{broadcast, mpsc};
2use tonic::Status;
3
4use crate::server::ProtocolServer;
5use crate::{WatchEvent, WatchRequest};
6
7pub async fn handle_watch(
17 server: &ProtocolServer,
18 req: WatchRequest,
19 tx: mpsc::Sender<Result<WatchEvent, Status>>,
20) {
21 let session = match server.validate_session(&req.session_id) {
22 Ok(s) => s,
23 Err(e) => {
24 let _ = tx.send(Err(e)).await;
25 return;
26 }
27 };
28
29 let repo_id = if !req.repo_id.is_empty() {
32 req.repo_id.clone()
33 } else {
34 match server.engine().get_repo(&session.codebase).await {
36 Ok((rid, _git_repo)) => rid.to_string(),
37 Err(_) => {
38 String::new()
40 }
41 }
42 };
43
44 let mut rx = if repo_id.is_empty() {
46 server.event_bus().subscribe_all()
47 } else {
48 server.event_bus().subscribe(&repo_id)
49 };
50
51 let filter = &req.filter;
52 let self_session_id = req.session_id.clone();
53
54 loop {
55 match rx.recv().await {
56 Ok(event) => {
57 if event.session_id == self_session_id {
59 continue;
60 }
61
62 if matches_filter(&event.event_type, filter)
63 && tx.send(Ok(event)).await.is_err()
64 {
65 break;
66 }
67 }
68 Err(broadcast::error::RecvError::Lagged(n)) => {
69 tracing::warn!("watch stream lagged by {} events", n);
70 }
71 Err(broadcast::error::RecvError::Closed) => break,
72 }
73 }
74}
75
76fn matches_filter(event_type: &str, filter: &str) -> bool {
84 if filter.is_empty() || filter == "*" {
85 return true;
86 }
87
88 if let Some(prefix) = filter.strip_suffix(".*") {
89 event_type.starts_with(prefix)
90 && event_type.as_bytes().get(prefix.len()) == Some(&b'.')
91 } else if let Some(suffix) = filter.strip_prefix("*.") {
92 event_type.ends_with(suffix)
93 && event_type.len() > suffix.len()
94 && event_type.as_bytes()[event_type.len() - suffix.len() - 1] == b'.'
95 } else {
96 event_type == filter
97 }
98}
99
100#[cfg(test)]
101mod tests {
102 use super::*;
103
104 #[test]
105 fn empty_filter_matches_all() {
106 assert!(matches_filter("changeset.submitted", ""));
107 assert!(matches_filter("anything", ""));
108 }
109
110 #[test]
111 fn star_matches_all() {
112 assert!(matches_filter("changeset.submitted", "*"));
113 assert!(matches_filter("anything", "*"));
114 }
115
116 #[test]
117 fn prefix_glob() {
118 assert!(matches_filter("changeset.submitted", "changeset.*"));
119 assert!(matches_filter("changeset.merged", "changeset.*"));
120 assert!(matches_filter("changeset.verified", "changeset.*"));
121 assert!(!matches_filter("branch.created", "changeset.*"));
122 assert!(!matches_filter("changesetx.foo", "changeset.*"));
123 }
124
125 #[test]
126 fn suffix_glob() {
127 assert!(matches_filter("changeset.merged", "*.merged"));
128 assert!(matches_filter("branch.merged", "*.merged"));
129 assert!(!matches_filter("changeset.submitted", "*.merged"));
130 assert!(!matches_filter("xmerged", "*.merged"));
131 }
132
133 #[test]
134 fn exact_match() {
135 assert!(matches_filter("changeset.submitted", "changeset.submitted"));
136 assert!(!matches_filter("changeset.merged", "changeset.submitted"));
137 }
138}