Skip to main content

ta_changeset/
multi_channel.rs

1// multi_channel.rs — Multi-channel routing for ReviewChannel (v0.10.0).
2//
3// MultiReviewChannel dispatches review requests to N inner channels.
4// Configurable strategy: `first_response` (default — first Ok wins) or
5// `quorum` (require N approvals before returning).
6
7use crate::interaction::{
8    ChannelCapabilities, InteractionRequest, InteractionResponse, Notification,
9};
10use crate::review_channel::{ReviewChannel, ReviewChannelError};
11
12/// Dispatch strategy for multi-channel review.
13#[derive(Debug, Clone, Default, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
14#[serde(rename_all = "snake_case")]
15pub enum MultiChannelStrategy {
16    /// First channel to respond wins (default).
17    #[default]
18    FirstResponse,
19    /// Require `quorum_size` approvals before returning.
20    Quorum { quorum_size: usize },
21}
22
23/// A ReviewChannel that dispatches to multiple inner channels.
24///
25/// For `request_interaction`, channels are tried sequentially — the first
26/// successful response is returned. For `notify`, all channels receive the
27/// notification (fan-out). Failures on individual channels are logged but
28/// don't prevent delivery to remaining channels.
29pub struct MultiReviewChannel {
30    channels: Vec<Box<dyn ReviewChannel>>,
31    strategy: MultiChannelStrategy,
32}
33
34impl MultiReviewChannel {
35    /// Create a new multi-channel from inner channels.
36    ///
37    /// # Panics
38    /// Panics if `channels` is empty — use a single channel directly instead.
39    pub fn new(channels: Vec<Box<dyn ReviewChannel>>, strategy: MultiChannelStrategy) -> Self {
40        assert!(
41            !channels.is_empty(),
42            "MultiReviewChannel requires at least one inner channel"
43        );
44        Self { channels, strategy }
45    }
46
47    /// Wrap a single channel (no-op wrapper for uniform handling).
48    pub fn single(channel: Box<dyn ReviewChannel>) -> Self {
49        Self {
50            channels: vec![channel],
51            strategy: MultiChannelStrategy::FirstResponse,
52        }
53    }
54
55    /// Number of inner channels.
56    pub fn len(&self) -> usize {
57        self.channels.len()
58    }
59
60    /// Whether this multi-channel is empty (should never be true).
61    pub fn is_empty(&self) -> bool {
62        self.channels.is_empty()
63    }
64
65    /// The configured dispatch strategy.
66    pub fn strategy(&self) -> &MultiChannelStrategy {
67        &self.strategy
68    }
69
70    /// Channel IDs of all inner channels.
71    pub fn inner_channel_ids(&self) -> Vec<&str> {
72        self.channels.iter().map(|c| c.channel_id()).collect()
73    }
74}
75
76impl ReviewChannel for MultiReviewChannel {
77    fn request_interaction(
78        &self,
79        request: &InteractionRequest,
80    ) -> Result<InteractionResponse, ReviewChannelError> {
81        match &self.strategy {
82            MultiChannelStrategy::FirstResponse => {
83                let mut last_err = None;
84                for channel in &self.channels {
85                    match channel.request_interaction(request) {
86                        Ok(response) => {
87                            tracing::info!(
88                                channel_id = channel.channel_id(),
89                                interaction_id = %request.interaction_id,
90                                "multi-channel: got response from channel"
91                            );
92                            return Ok(response);
93                        }
94                        Err(e) => {
95                            tracing::warn!(
96                                channel_id = channel.channel_id(),
97                                error = %e,
98                                "multi-channel: channel failed, trying next"
99                            );
100                            last_err = Some(e);
101                        }
102                    }
103                }
104                Err(last_err
105                    .unwrap_or_else(|| ReviewChannelError::Other("no channels available".into())))
106            }
107            MultiChannelStrategy::Quorum { quorum_size } => {
108                let mut approvals = 0usize;
109                let mut last_response = None;
110                let mut errors = Vec::new();
111
112                for channel in &self.channels {
113                    match channel.request_interaction(request) {
114                        Ok(response) => {
115                            approvals += 1;
116                            last_response = Some(response);
117                            if approvals >= *quorum_size {
118                                tracing::info!(
119                                    approvals,
120                                    quorum_size,
121                                    "multi-channel: quorum reached"
122                                );
123                                return Ok(last_response.unwrap());
124                            }
125                        }
126                        Err(e) => {
127                            tracing::warn!(
128                                channel_id = channel.channel_id(),
129                                error = %e,
130                                "multi-channel: channel failed in quorum"
131                            );
132                            errors.push(e);
133                        }
134                    }
135                }
136
137                // Not enough approvals.
138                if let Some(response) = last_response {
139                    tracing::warn!(
140                        approvals,
141                        quorum_size,
142                        "multi-channel: quorum not reached, returning best response"
143                    );
144                    Ok(response)
145                } else {
146                    Err(errors.into_iter().next().unwrap_or_else(|| {
147                        ReviewChannelError::Other(format!(
148                            "quorum not reached: needed {quorum_size} approvals, got {approvals}"
149                        ))
150                    }))
151                }
152            }
153        }
154    }
155
156    fn notify(&self, notification: &Notification) -> Result<(), ReviewChannelError> {
157        let mut last_err = None;
158        let mut delivered = 0usize;
159
160        for channel in &self.channels {
161            match channel.notify(notification) {
162                Ok(()) => delivered += 1,
163                Err(e) => {
164                    tracing::warn!(
165                        channel_id = channel.channel_id(),
166                        error = %e,
167                        "multi-channel: notification delivery failed on channel"
168                    );
169                    last_err = Some(e);
170                }
171            }
172        }
173
174        if delivered > 0 {
175            Ok(())
176        } else {
177            Err(last_err.unwrap_or_else(|| {
178                ReviewChannelError::Other("no channels delivered notification".into())
179            }))
180        }
181    }
182
183    fn capabilities(&self) -> ChannelCapabilities {
184        // Merge capabilities: if any channel supports a capability, the
185        // multi-channel reports it.
186        let mut caps = ChannelCapabilities::default();
187        for channel in &self.channels {
188            let c = channel.capabilities();
189            caps.supports_async = caps.supports_async || c.supports_async;
190            caps.supports_rich_media = caps.supports_rich_media || c.supports_rich_media;
191            caps.supports_threads = caps.supports_threads || c.supports_threads;
192        }
193        caps
194    }
195
196    fn channel_id(&self) -> &str {
197        "multi-channel"
198    }
199}
200
201#[cfg(test)]
202mod tests {
203    use super::*;
204    use crate::interaction::{InteractionKind, Urgency};
205    use crate::terminal_channel::AutoApproveChannel;
206
207    fn test_request() -> InteractionRequest {
208        InteractionRequest::new(
209            InteractionKind::DraftReview,
210            serde_json::json!({"draft_id": "test"}),
211            Urgency::Blocking,
212        )
213    }
214
215    #[test]
216    fn single_channel_passthrough() {
217        let ch = MultiReviewChannel::single(Box::new(AutoApproveChannel::new()));
218        assert_eq!(ch.len(), 1);
219        assert_eq!(ch.channel_id(), "multi-channel");
220        let resp = ch.request_interaction(&test_request());
221        assert!(resp.is_ok());
222    }
223
224    #[test]
225    fn multi_channel_first_response() {
226        let channels: Vec<Box<dyn ReviewChannel>> = vec![
227            Box::new(AutoApproveChannel::new()),
228            Box::new(AutoApproveChannel::new()),
229        ];
230        let ch = MultiReviewChannel::new(channels, MultiChannelStrategy::FirstResponse);
231        assert_eq!(ch.len(), 2);
232        let resp = ch.request_interaction(&test_request());
233        assert!(resp.is_ok());
234    }
235
236    #[test]
237    fn multi_channel_quorum() {
238        let channels: Vec<Box<dyn ReviewChannel>> = vec![
239            Box::new(AutoApproveChannel::new()),
240            Box::new(AutoApproveChannel::new()),
241            Box::new(AutoApproveChannel::new()),
242        ];
243        let ch = MultiReviewChannel::new(channels, MultiChannelStrategy::Quorum { quorum_size: 2 });
244        let resp = ch.request_interaction(&test_request());
245        assert!(resp.is_ok());
246    }
247
248    #[test]
249    fn notify_fans_out() {
250        let channels: Vec<Box<dyn ReviewChannel>> = vec![
251            Box::new(AutoApproveChannel::new()),
252            Box::new(AutoApproveChannel::new()),
253        ];
254        let ch = MultiReviewChannel::new(channels, MultiChannelStrategy::FirstResponse);
255        let notif = Notification {
256            notification_id: uuid::Uuid::new_v4(),
257            level: crate::interaction::NotificationLevel::Info,
258            message: "test notification".into(),
259            created_at: chrono::Utc::now(),
260            goal_id: None,
261        };
262        assert!(ch.notify(&notif).is_ok());
263    }
264
265    #[test]
266    fn inner_channel_ids() {
267        let channels: Vec<Box<dyn ReviewChannel>> = vec![
268            Box::new(AutoApproveChannel::new()),
269            Box::new(AutoApproveChannel::new()),
270        ];
271        let ch = MultiReviewChannel::new(channels, MultiChannelStrategy::FirstResponse);
272        let ids = ch.inner_channel_ids();
273        assert_eq!(ids.len(), 2);
274    }
275
276    #[test]
277    fn capabilities_merge() {
278        let channels: Vec<Box<dyn ReviewChannel>> = vec![Box::new(AutoApproveChannel::new())];
279        let ch = MultiReviewChannel::new(channels, MultiChannelStrategy::FirstResponse);
280        let caps = ch.capabilities();
281        // AutoApproveChannel has default capabilities
282        assert!(!caps.supports_rich_media);
283    }
284
285    #[test]
286    #[should_panic(expected = "requires at least one")]
287    fn empty_channels_panic() {
288        let _ch = MultiReviewChannel::new(vec![], MultiChannelStrategy::FirstResponse);
289    }
290
291    #[test]
292    fn strategy_accessor() {
293        let ch = MultiReviewChannel::single(Box::new(AutoApproveChannel::new()));
294        assert_eq!(ch.strategy(), &MultiChannelStrategy::FirstResponse);
295    }
296}