everruns_core/channel.rs
1// Multi-platform channel abstractions
2//
3// Design Decision: Channel adapters are the boundary between platform-specific
4// protocols (Slack, Discord, Teams, Telegram) and the platform-agnostic core.
5// Each adapter translates inbound platform events into InboundChannelEvent and
6// receives OutboundChannelMessage for delivery. The core never imports
7// platform-specific types.
8//
9// Design Decision: ThreadContext carries participant tracking across all
10// platforms. Multi-user threads (e.g. a Slack thread with 3 people, a Discord
11// channel) share a single session with per-message ExternalActor attribution.
12// ThreadContext is the "who's in this conversation" view; ExternalActor is the
13// "who sent this message" view.
14//
15// Design Decision: Async agent invocations are first-class. The
16// ChannelDeliveryAdapter trait models the webhook→ack→async-response pattern
17// generically. Platform adapters implement `deliver()` to post results back
18// when the agent finishes (minutes to hours later).
19//
20// Design Decision: Platform-contributed tools (e.g. slack_add_reaction,
21// discord_create_thread) are a known gap. The Capability trait already supports
22// tools(), but no channel adapter contributes tools yet. Tracked for future
23// work — see TODO(platform-tools) below.
24
25use crate::message::ExternalActor;
26use crate::typed_id::SessionId;
27use async_trait::async_trait;
28use serde::{Deserialize, Serialize};
29use std::collections::HashMap;
30
31// ============================================
32// Thread & Participant tracking
33// ============================================
34
35/// A participant in a multi-user thread.
36///
37/// Wraps ExternalActor with thread-level metadata (when they joined,
38/// their role in the thread). Participants are accumulated over the
39/// lifetime of a session.
40#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
41pub struct Participant {
42 /// The external actor identity (platform user ID, display name, source).
43 pub actor: ExternalActor,
44 /// When this participant first appeared in the thread.
45 #[serde(default, skip_serializing_if = "Option::is_none")]
46 pub first_seen_at: Option<chrono::DateTime<chrono::Utc>>,
47 /// Platform-specific role (e.g. "owner", "member", "guest").
48 /// Not all platforms expose this.
49 #[serde(default, skip_serializing_if = "Option::is_none")]
50 pub role: Option<String>,
51}
52
53/// Thread-level context for multi-user conversations.
54///
55/// A ThreadContext is created when a session is bound to a platform thread
56/// and accumulates participants as messages arrive. Platform adapters update
57/// this when new users join a thread.
58#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
59pub struct ThreadContext {
60 /// Platform-specific thread identifier (e.g. Slack thread_ts, Discord channel_id).
61 pub thread_ref: String,
62 /// Source platform (e.g. "slack", "discord", "teams").
63 pub platform: String,
64 /// Platform-specific channel/workspace context.
65 #[serde(default, skip_serializing_if = "HashMap::is_empty")]
66 pub platform_metadata: HashMap<String, String>,
67 /// Known participants in this thread, keyed by actor_id for O(1) lookup.
68 #[serde(default, skip_serializing_if = "HashMap::is_empty")]
69 pub participants: HashMap<String, Participant>,
70}
71
72impl ThreadContext {
73 /// Create a new thread context for a platform thread.
74 pub fn new(thread_ref: impl Into<String>, platform: impl Into<String>) -> Self {
75 Self {
76 thread_ref: thread_ref.into(),
77 platform: platform.into(),
78 platform_metadata: HashMap::new(),
79 participants: HashMap::new(),
80 }
81 }
82
83 /// Record a participant. Updates first_seen_at only if new.
84 /// Returns true if this is a newly seen participant.
85 pub fn track_participant(&mut self, actor: &ExternalActor) -> bool {
86 use std::collections::hash_map::Entry;
87 match self.participants.entry(actor.actor_id.clone()) {
88 Entry::Vacant(entry) => {
89 entry.insert(Participant {
90 actor: actor.clone(),
91 first_seen_at: Some(chrono::Utc::now()),
92 role: None,
93 });
94 true
95 }
96 Entry::Occupied(mut entry) => {
97 // Update display name if it changed (user renamed)
98 if actor.actor_name != entry.get().actor.actor_name {
99 entry.get_mut().actor.actor_name = actor.actor_name.clone();
100 }
101 false
102 }
103 }
104 }
105
106 /// Number of distinct participants.
107 pub fn participant_count(&self) -> usize {
108 self.participants.len()
109 }
110
111 /// Build a summary line for LLM context injection.
112 /// e.g. "Thread participants: Alice, Bob, Charlie"
113 pub fn participants_summary(&self) -> String {
114 if self.participants.is_empty() {
115 return String::new();
116 }
117 let mut names: Vec<String> = self
118 .participants
119 .values()
120 .map(|p| p.actor.display_label().to_string())
121 .collect();
122 names.sort();
123 format!("Thread participants: {}", names.join(", "))
124 }
125}
126
127// ============================================
128// Inbound channel events
129// ============================================
130
131/// A platform-agnostic inbound event from a channel.
132///
133/// Platform adapters parse their native webhook payloads into this type.
134/// The server routes it to the correct session and creates the appropriate
135/// input.message event.
136#[derive(Debug, Clone)]
137pub struct InboundChannelEvent {
138 /// Who sent this message.
139 pub actor: ExternalActor,
140 /// Message text content (may be empty for attachment-only messages).
141 pub text: String,
142 /// Attached content (images, files) as platform-agnostic parts.
143 pub attachments: Vec<InboundAttachment>,
144 /// Platform-specific dedup key (e.g. Slack event_ts, Discord message_id).
145 /// Used to prevent duplicate processing on webhook retries.
146 pub dedup_key: String,
147 /// Thread reference for routing to the correct session.
148 /// None for DMs or platforms without threading.
149 pub thread_ref: Option<String>,
150 /// Platform-specific metadata for session tag construction.
151 pub routing_metadata: HashMap<String, String>,
152}
153
154/// Attachment from an inbound platform message.
155#[derive(Debug, Clone)]
156pub enum InboundAttachment {
157 /// Image with a fetchable URL.
158 Image {
159 url: String,
160 alt_text: Option<String>,
161 },
162 /// Non-image file described as text.
163 FileDescription {
164 name: String,
165 mime_type: Option<String>,
166 },
167}
168
169// ============================================
170// Outbound channel messages
171// ============================================
172
173/// A platform-agnostic outbound message to deliver to a channel.
174///
175/// The delivery adapter translates this into platform-specific API calls
176/// (e.g. Slack chat.postMessage, Discord channel message create).
177#[derive(Debug, Clone)]
178pub struct OutboundChannelMessage {
179 /// The session this message belongs to.
180 pub session_id: SessionId,
181 /// Text content to deliver.
182 pub text: String,
183 /// Thread reference for reply targeting.
184 pub thread_ref: String,
185 /// Whether this is a progress report (vs. a final answer).
186 pub is_progress_report: bool,
187}
188
189// ============================================
190// Channel delivery adapter (async agent responses)
191// ============================================
192
193/// Reply mode for channel delivery — controls which agent output reaches the channel.
194///
195/// Generalizes SlackReplyMode to work across all platforms.
196#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq, Default)]
197#[serde(rename_all = "snake_case")]
198pub enum ChannelReplyMode {
199 /// Forward all completed assistant messages to the channel.
200 #[default]
201 AllMessages,
202 /// Only deliver explicit report_progress tool outputs.
203 ReportProgressOnly,
204}
205
206/// Trait for platform-specific delivery of agent responses.
207///
208/// Implementations handle the "last mile" of posting messages back to
209/// Slack, Discord, Teams, etc. The generic delivery dispatcher calls
210/// these methods; platform adapters implement them.
211///
212/// Lifecycle:
213/// 1. Webhook arrives → adapter parses InboundChannelEvent
214/// 2. Server routes to session, creates input.message, triggers agent
215/// 3. Core delivery dispatcher records a pending delivery for this session/turn
216/// 4. Agent runs asynchronously (seconds to hours)
217/// 5. When agent output is ready, the delivery dispatcher calls `deliver()`
218/// 6. When the turn completes or is cancelled, the dispatcher clears the pending delivery
219#[async_trait]
220pub trait ChannelDeliveryAdapter: Send + Sync {
221 /// Platform identifier (e.g. "slack", "discord").
222 fn platform(&self) -> &str;
223
224 /// Deliver a message to the platform channel.
225 ///
226 /// Called by the generic delivery dispatcher when agent output is ready.
227 /// Implementations should handle retries internally for transient failures.
228 async fn deliver(
229 &self,
230 message: &OutboundChannelMessage,
231 context: &DeliveryContext,
232 ) -> DeliveryResult;
233
234 /// Send an immediate acknowledgement to the channel.
235 ///
236 /// Called right after webhook ingestion for async agent invocations.
237 /// e.g. Slack's "On it." message in report_progress_only mode.
238 /// Platforms that don't need an ack can return Ok(()).
239 async fn send_ack(
240 &self,
241 thread_ref: &str,
242 text: &str,
243 context: &DeliveryContext,
244 ) -> DeliveryResult;
245
246 /// Format a progress report for this platform.
247 ///
248 /// Different platforms have different formatting (Slack mrkdwn, Discord markdown, etc.)
249 fn format_progress_report(
250 &self,
251 report: &crate::progress_reporting::ProgressReportPayload,
252 ) -> String;
253}
254
255/// Context needed by a delivery adapter to post messages.
256///
257/// Stored when a delivery is registered, consumed when events arrive.
258/// Platform adapters extend this with platform-specific fields via `extra`.
259#[derive(Clone)]
260pub struct DeliveryContext {
261 /// Bot/app authentication token for the platform API.
262 pub auth_token: String,
263 /// Platform-specific channel/conversation ID.
264 pub channel_id: String,
265 /// Thread reference for reply targeting.
266 pub thread_ref: String,
267 /// Reply mode controlling which output is delivered.
268 pub reply_mode: ChannelReplyMode,
269 /// Platform-specific extra context (e.g. team_id, workspace URL).
270 pub extra: HashMap<String, String>,
271}
272
273impl std::fmt::Debug for DeliveryContext {
274 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
275 f.debug_struct("DeliveryContext")
276 .field("auth_token", &"[REDACTED]")
277 .field("channel_id", &self.channel_id)
278 .field("thread_ref", &self.thread_ref)
279 .field("reply_mode", &self.reply_mode)
280 .field("extra", &self.extra)
281 .finish()
282 }
283}
284
285/// Result of a delivery attempt.
286#[derive(Debug)]
287pub enum DeliveryResult {
288 /// Message delivered successfully.
289 Ok,
290 /// Transient failure — caller should retry with backoff.
291 TransientError(String),
292 /// Permanent failure — do not retry (e.g. invalid token, channel deleted).
293 PermanentError(String),
294}
295
296// ============================================
297// Session strategy (generalized from Slack)
298// ============================================
299
300/// Channel-agnostic session routing tag builder.
301///
302/// Given platform metadata from an InboundChannelEvent, produces the
303/// session tags used to find or create the correct session.
304pub fn build_session_routing_tag(
305 platform: &str,
306 strategy: &SessionRoutingStrategy,
307 metadata: &HashMap<String, String>,
308) -> Option<String> {
309 match strategy {
310 SessionRoutingStrategy::PerThread => metadata
311 .get("thread_ref")
312 .map(|t| format!("{}:thread:{}", platform, t)),
313 SessionRoutingStrategy::PerChannel => metadata
314 .get("channel_id")
315 .map(|c| format!("{}:channel:{}", platform, c)),
316 SessionRoutingStrategy::PerUser => metadata
317 .get("user_id")
318 .map(|u| format!("{}:user:{}", platform, u)),
319 }
320}
321
322/// How incoming messages map to sessions — generalized from Slack's SessionStrategy.
323#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq, Default)]
324#[serde(rename_all = "snake_case")]
325pub enum SessionRoutingStrategy {
326 /// Each thread gets its own session (default).
327 #[default]
328 PerThread,
329 /// One session per channel/conversation.
330 PerChannel,
331 /// One session per user across all threads.
332 PerUser,
333}
334
335// TODO(platform-tools): Channel adapters should optionally contribute
336// platform-specific tools via the Capability trait. Examples:
337// - Slack: add_reaction, post_to_channel, create_thread, upload_file
338// - Discord: create_thread, add_reaction, pin_message
339// - Teams: send_adaptive_card, create_tab
340// The plumbing exists (Capability::tools()), but no adapter uses it yet.
341// When implementing, tools should receive platform context via ToolContext
342// (which already has session access for looking up channel config).
343
344#[cfg(test)]
345mod tests {
346 use super::*;
347
348 #[test]
349 fn test_thread_context_track_participant() {
350 let mut ctx = ThreadContext::new("1234.5678", "slack");
351 let actor = ExternalActor {
352 actor_id: "U001".into(),
353 actor_name: Some("Alice".into()),
354 source: "slack".into(),
355 metadata: None,
356 };
357
358 assert!(ctx.track_participant(&actor));
359 assert!(!ctx.track_participant(&actor)); // second time is not new
360 assert_eq!(ctx.participant_count(), 1);
361 }
362
363 #[test]
364 fn test_thread_context_updates_display_name() {
365 let mut ctx = ThreadContext::new("thread_1", "slack");
366 let actor_v1 = ExternalActor {
367 actor_id: "U001".into(),
368 actor_name: Some("Alice".into()),
369 source: "slack".into(),
370 metadata: None,
371 };
372 let actor_v2 = ExternalActor {
373 actor_id: "U001".into(),
374 actor_name: Some("Alice B.".into()),
375 source: "slack".into(),
376 metadata: None,
377 };
378
379 ctx.track_participant(&actor_v1);
380 ctx.track_participant(&actor_v2);
381
382 let p = ctx.participants.get("U001").unwrap();
383 assert_eq!(p.actor.actor_name.as_deref(), Some("Alice B."));
384 }
385
386 #[test]
387 fn test_thread_context_participants_summary() {
388 let mut ctx = ThreadContext::new("thread_1", "discord");
389 assert_eq!(ctx.participants_summary(), "");
390
391 ctx.track_participant(&ExternalActor {
392 actor_id: "U001".into(),
393 actor_name: Some("Alice".into()),
394 source: "discord".into(),
395 metadata: None,
396 });
397 ctx.track_participant(&ExternalActor {
398 actor_id: "U002".into(),
399 actor_name: None,
400 source: "discord".into(),
401 metadata: None,
402 });
403
404 let summary = ctx.participants_summary();
405 assert!(summary.contains("Alice"));
406 assert!(summary.contains("U002")); // fallback to actor_id
407 }
408
409 #[test]
410 fn test_build_session_routing_tag_per_thread() {
411 let mut meta = HashMap::new();
412 meta.insert("thread_ref".into(), "1234.5678".into());
413 let tag = build_session_routing_tag("slack", &SessionRoutingStrategy::PerThread, &meta);
414 assert_eq!(tag, Some("slack:thread:1234.5678".into()));
415 }
416
417 #[test]
418 fn test_build_session_routing_tag_per_channel() {
419 let mut meta = HashMap::new();
420 meta.insert("channel_id".into(), "C0123".into());
421 let tag = build_session_routing_tag("discord", &SessionRoutingStrategy::PerChannel, &meta);
422 assert_eq!(tag, Some("discord:channel:C0123".into()));
423 }
424
425 #[test]
426 fn test_build_session_routing_tag_per_user() {
427 let mut meta = HashMap::new();
428 meta.insert("user_id".into(), "U999".into());
429 let tag = build_session_routing_tag("teams", &SessionRoutingStrategy::PerUser, &meta);
430 assert_eq!(tag, Some("teams:user:U999".into()));
431 }
432
433 #[test]
434 fn test_build_session_routing_tag_missing_metadata() {
435 let meta = HashMap::new();
436 let tag = build_session_routing_tag("slack", &SessionRoutingStrategy::PerThread, &meta);
437 assert_eq!(tag, None);
438 }
439
440 #[test]
441 fn test_channel_reply_mode_default() {
442 assert_eq!(ChannelReplyMode::default(), ChannelReplyMode::AllMessages);
443 }
444
445 #[test]
446 fn test_channel_reply_mode_serde_roundtrip() {
447 let json = serde_json::to_string(&ChannelReplyMode::ReportProgressOnly).unwrap();
448 assert_eq!(json, r#""report_progress_only""#);
449 let parsed: ChannelReplyMode = serde_json::from_str(&json).unwrap();
450 assert_eq!(parsed, ChannelReplyMode::ReportProgressOnly);
451 }
452
453 #[test]
454 fn test_session_routing_strategy_default() {
455 assert_eq!(
456 SessionRoutingStrategy::default(),
457 SessionRoutingStrategy::PerThread
458 );
459 }
460
461 #[test]
462 fn test_session_routing_strategy_serde() {
463 let json = serde_json::to_string(&SessionRoutingStrategy::PerChannel).unwrap();
464 assert_eq!(json, r#""per_channel""#);
465 let parsed: SessionRoutingStrategy = serde_json::from_str(&json).unwrap();
466 assert_eq!(parsed, SessionRoutingStrategy::PerChannel);
467 }
468}