ta_changeset/
multi_channel.rs1use crate::interaction::{
8 ChannelCapabilities, InteractionRequest, InteractionResponse, Notification,
9};
10use crate::review_channel::{ReviewChannel, ReviewChannelError};
11
12#[derive(Debug, Clone, Default, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
14#[serde(rename_all = "snake_case")]
15pub enum MultiChannelStrategy {
16 #[default]
18 FirstResponse,
19 Quorum { quorum_size: usize },
21}
22
23pub struct MultiReviewChannel {
30 channels: Vec<Box<dyn ReviewChannel>>,
31 strategy: MultiChannelStrategy,
32}
33
34impl MultiReviewChannel {
35 pub fn new(channels: Vec<Box<dyn ReviewChannel>>, strategy: MultiChannelStrategy) -> Self {
40 assert!(
41 !channels.is_empty(),
42 "MultiReviewChannel requires at least one inner channel"
43 );
44 Self { channels, strategy }
45 }
46
47 pub fn single(channel: Box<dyn ReviewChannel>) -> Self {
49 Self {
50 channels: vec![channel],
51 strategy: MultiChannelStrategy::FirstResponse,
52 }
53 }
54
55 pub fn len(&self) -> usize {
57 self.channels.len()
58 }
59
60 pub fn is_empty(&self) -> bool {
62 self.channels.is_empty()
63 }
64
65 pub fn strategy(&self) -> &MultiChannelStrategy {
67 &self.strategy
68 }
69
70 pub fn inner_channel_ids(&self) -> Vec<&str> {
72 self.channels.iter().map(|c| c.channel_id()).collect()
73 }
74}
75
76impl ReviewChannel for MultiReviewChannel {
77 fn request_interaction(
78 &self,
79 request: &InteractionRequest,
80 ) -> Result<InteractionResponse, ReviewChannelError> {
81 match &self.strategy {
82 MultiChannelStrategy::FirstResponse => {
83 let mut last_err = None;
84 for channel in &self.channels {
85 match channel.request_interaction(request) {
86 Ok(response) => {
87 tracing::info!(
88 channel_id = channel.channel_id(),
89 interaction_id = %request.interaction_id,
90 "multi-channel: got response from channel"
91 );
92 return Ok(response);
93 }
94 Err(e) => {
95 tracing::warn!(
96 channel_id = channel.channel_id(),
97 error = %e,
98 "multi-channel: channel failed, trying next"
99 );
100 last_err = Some(e);
101 }
102 }
103 }
104 Err(last_err
105 .unwrap_or_else(|| ReviewChannelError::Other("no channels available".into())))
106 }
107 MultiChannelStrategy::Quorum { quorum_size } => {
108 let mut approvals = 0usize;
109 let mut last_response = None;
110 let mut errors = Vec::new();
111
112 for channel in &self.channels {
113 match channel.request_interaction(request) {
114 Ok(response) => {
115 approvals += 1;
116 last_response = Some(response);
117 if approvals >= *quorum_size {
118 tracing::info!(
119 approvals,
120 quorum_size,
121 "multi-channel: quorum reached"
122 );
123 return Ok(last_response.unwrap());
124 }
125 }
126 Err(e) => {
127 tracing::warn!(
128 channel_id = channel.channel_id(),
129 error = %e,
130 "multi-channel: channel failed in quorum"
131 );
132 errors.push(e);
133 }
134 }
135 }
136
137 if let Some(response) = last_response {
139 tracing::warn!(
140 approvals,
141 quorum_size,
142 "multi-channel: quorum not reached, returning best response"
143 );
144 Ok(response)
145 } else {
146 Err(errors.into_iter().next().unwrap_or_else(|| {
147 ReviewChannelError::Other(format!(
148 "quorum not reached: needed {quorum_size} approvals, got {approvals}"
149 ))
150 }))
151 }
152 }
153 }
154 }
155
156 fn notify(&self, notification: &Notification) -> Result<(), ReviewChannelError> {
157 let mut last_err = None;
158 let mut delivered = 0usize;
159
160 for channel in &self.channels {
161 match channel.notify(notification) {
162 Ok(()) => delivered += 1,
163 Err(e) => {
164 tracing::warn!(
165 channel_id = channel.channel_id(),
166 error = %e,
167 "multi-channel: notification delivery failed on channel"
168 );
169 last_err = Some(e);
170 }
171 }
172 }
173
174 if delivered > 0 {
175 Ok(())
176 } else {
177 Err(last_err.unwrap_or_else(|| {
178 ReviewChannelError::Other("no channels delivered notification".into())
179 }))
180 }
181 }
182
183 fn capabilities(&self) -> ChannelCapabilities {
184 let mut caps = ChannelCapabilities::default();
187 for channel in &self.channels {
188 let c = channel.capabilities();
189 caps.supports_async = caps.supports_async || c.supports_async;
190 caps.supports_rich_media = caps.supports_rich_media || c.supports_rich_media;
191 caps.supports_threads = caps.supports_threads || c.supports_threads;
192 }
193 caps
194 }
195
196 fn channel_id(&self) -> &str {
197 "multi-channel"
198 }
199}
200
201#[cfg(test)]
202mod tests {
203 use super::*;
204 use crate::interaction::{InteractionKind, Urgency};
205 use crate::terminal_channel::AutoApproveChannel;
206
207 fn test_request() -> InteractionRequest {
208 InteractionRequest::new(
209 InteractionKind::DraftReview,
210 serde_json::json!({"draft_id": "test"}),
211 Urgency::Blocking,
212 )
213 }
214
215 #[test]
216 fn single_channel_passthrough() {
217 let ch = MultiReviewChannel::single(Box::new(AutoApproveChannel::new()));
218 assert_eq!(ch.len(), 1);
219 assert_eq!(ch.channel_id(), "multi-channel");
220 let resp = ch.request_interaction(&test_request());
221 assert!(resp.is_ok());
222 }
223
224 #[test]
225 fn multi_channel_first_response() {
226 let channels: Vec<Box<dyn ReviewChannel>> = vec![
227 Box::new(AutoApproveChannel::new()),
228 Box::new(AutoApproveChannel::new()),
229 ];
230 let ch = MultiReviewChannel::new(channels, MultiChannelStrategy::FirstResponse);
231 assert_eq!(ch.len(), 2);
232 let resp = ch.request_interaction(&test_request());
233 assert!(resp.is_ok());
234 }
235
236 #[test]
237 fn multi_channel_quorum() {
238 let channels: Vec<Box<dyn ReviewChannel>> = vec![
239 Box::new(AutoApproveChannel::new()),
240 Box::new(AutoApproveChannel::new()),
241 Box::new(AutoApproveChannel::new()),
242 ];
243 let ch = MultiReviewChannel::new(channels, MultiChannelStrategy::Quorum { quorum_size: 2 });
244 let resp = ch.request_interaction(&test_request());
245 assert!(resp.is_ok());
246 }
247
248 #[test]
249 fn notify_fans_out() {
250 let channels: Vec<Box<dyn ReviewChannel>> = vec![
251 Box::new(AutoApproveChannel::new()),
252 Box::new(AutoApproveChannel::new()),
253 ];
254 let ch = MultiReviewChannel::new(channels, MultiChannelStrategy::FirstResponse);
255 let notif = Notification {
256 notification_id: uuid::Uuid::new_v4(),
257 level: crate::interaction::NotificationLevel::Info,
258 message: "test notification".into(),
259 created_at: chrono::Utc::now(),
260 goal_id: None,
261 };
262 assert!(ch.notify(¬if).is_ok());
263 }
264
265 #[test]
266 fn inner_channel_ids() {
267 let channels: Vec<Box<dyn ReviewChannel>> = vec![
268 Box::new(AutoApproveChannel::new()),
269 Box::new(AutoApproveChannel::new()),
270 ];
271 let ch = MultiReviewChannel::new(channels, MultiChannelStrategy::FirstResponse);
272 let ids = ch.inner_channel_ids();
273 assert_eq!(ids.len(), 2);
274 }
275
276 #[test]
277 fn capabilities_merge() {
278 let channels: Vec<Box<dyn ReviewChannel>> = vec![Box::new(AutoApproveChannel::new())];
279 let ch = MultiReviewChannel::new(channels, MultiChannelStrategy::FirstResponse);
280 let caps = ch.capabilities();
281 assert!(!caps.supports_rich_media);
283 }
284
285 #[test]
286 #[should_panic(expected = "requires at least one")]
287 fn empty_channels_panic() {
288 let _ch = MultiReviewChannel::new(vec![], MultiChannelStrategy::FirstResponse);
289 }
290
291 #[test]
292 fn strategy_accessor() {
293 let ch = MultiReviewChannel::single(Box::new(AutoApproveChannel::new()));
294 assert_eq!(ch.strategy(), &MultiChannelStrategy::FirstResponse);
295 }
296}