Skip to main content

everruns_core/
channel.rs

1// Multi-platform channel abstractions
2//
3// Design Decision: Channel adapters are the boundary between platform-specific
4// protocols (Slack, Discord, Teams, Telegram) and the platform-agnostic core.
5// Each adapter translates inbound platform events into InboundChannelEvent and
6// receives OutboundChannelMessage for delivery. The core never imports
7// platform-specific types.
8//
9// Design Decision: ThreadContext carries participant tracking across all
10// platforms. Multi-user threads (e.g. a Slack thread with 3 people, a Discord
11// channel) share a single session with per-message ExternalActor attribution.
12// ThreadContext is the "who's in this conversation" view; ExternalActor is the
13// "who sent this message" view.
14//
15// Design Decision: Async agent invocations are first-class. The
16// ChannelDeliveryAdapter trait models the webhook→ack→async-response pattern
17// generically. Platform adapters implement `deliver()` to post results back
18// when the agent finishes (minutes to hours later).
19//
20// Design Decision: Platform-contributed tools (e.g. slack_add_reaction,
21// discord_create_thread) are a known gap. The Capability trait already supports
22// tools(), but no channel adapter contributes tools yet. Tracked for future
23// work — see TODO(platform-tools) below.
24
25use crate::message::ExternalActor;
26use crate::typed_id::SessionId;
27use async_trait::async_trait;
28use serde::{Deserialize, Serialize};
29use std::collections::HashMap;
30
31// ============================================
32// Thread & Participant tracking
33// ============================================
34
35/// A participant in a multi-user thread.
36///
37/// Wraps ExternalActor with thread-level metadata (when they joined,
38/// their role in the thread). Participants are accumulated over the
39/// lifetime of a session.
40#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
41pub struct Participant {
42    /// The external actor identity (platform user ID, display name, source).
43    pub actor: ExternalActor,
44    /// When this participant first appeared in the thread.
45    #[serde(default, skip_serializing_if = "Option::is_none")]
46    pub first_seen_at: Option<chrono::DateTime<chrono::Utc>>,
47    /// Platform-specific role (e.g. "owner", "member", "guest").
48    /// Not all platforms expose this.
49    #[serde(default, skip_serializing_if = "Option::is_none")]
50    pub role: Option<String>,
51}
52
53/// Thread-level context for multi-user conversations.
54///
55/// A ThreadContext is created when a session is bound to a platform thread
56/// and accumulates participants as messages arrive. Platform adapters update
57/// this when new users join a thread.
58#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
59pub struct ThreadContext {
60    /// Platform-specific thread identifier (e.g. Slack thread_ts, Discord channel_id).
61    pub thread_ref: String,
62    /// Source platform (e.g. "slack", "discord", "teams").
63    pub platform: String,
64    /// Platform-specific channel/workspace context.
65    #[serde(default, skip_serializing_if = "HashMap::is_empty")]
66    pub platform_metadata: HashMap<String, String>,
67    /// Known participants in this thread, keyed by actor_id for O(1) lookup.
68    #[serde(default, skip_serializing_if = "HashMap::is_empty")]
69    pub participants: HashMap<String, Participant>,
70}
71
72impl ThreadContext {
73    /// Create a new thread context for a platform thread.
74    pub fn new(thread_ref: impl Into<String>, platform: impl Into<String>) -> Self {
75        Self {
76            thread_ref: thread_ref.into(),
77            platform: platform.into(),
78            platform_metadata: HashMap::new(),
79            participants: HashMap::new(),
80        }
81    }
82
83    /// Record a participant. Updates first_seen_at only if new.
84    /// Returns true if this is a newly seen participant.
85    pub fn track_participant(&mut self, actor: &ExternalActor) -> bool {
86        use std::collections::hash_map::Entry;
87        match self.participants.entry(actor.actor_id.clone()) {
88            Entry::Vacant(entry) => {
89                entry.insert(Participant {
90                    actor: actor.clone(),
91                    first_seen_at: Some(chrono::Utc::now()),
92                    role: None,
93                });
94                true
95            }
96            Entry::Occupied(mut entry) => {
97                // Update display name if it changed (user renamed)
98                if actor.actor_name != entry.get().actor.actor_name {
99                    entry.get_mut().actor.actor_name = actor.actor_name.clone();
100                }
101                false
102            }
103        }
104    }
105
106    /// Number of distinct participants.
107    pub fn participant_count(&self) -> usize {
108        self.participants.len()
109    }
110
111    /// Build a summary line for LLM context injection.
112    /// e.g. "Thread participants: Alice, Bob, Charlie"
113    pub fn participants_summary(&self) -> String {
114        if self.participants.is_empty() {
115            return String::new();
116        }
117        let mut names: Vec<String> = self
118            .participants
119            .values()
120            .map(|p| p.actor.display_label().to_string())
121            .collect();
122        names.sort();
123        format!("Thread participants: {}", names.join(", "))
124    }
125}
126
127// ============================================
128// Inbound channel events
129// ============================================
130
131/// A platform-agnostic inbound event from a channel.
132///
133/// Platform adapters parse their native webhook payloads into this type.
134/// The server routes it to the correct session and creates the appropriate
135/// input.message event.
136#[derive(Debug, Clone)]
137pub struct InboundChannelEvent {
138    /// Who sent this message.
139    pub actor: ExternalActor,
140    /// Message text content (may be empty for attachment-only messages).
141    pub text: String,
142    /// Attached content (images, files) as platform-agnostic parts.
143    pub attachments: Vec<InboundAttachment>,
144    /// Platform-specific dedup key (e.g. Slack event_ts, Discord message_id).
145    /// Used to prevent duplicate processing on webhook retries.
146    pub dedup_key: String,
147    /// Thread reference for routing to the correct session.
148    /// None for DMs or platforms without threading.
149    pub thread_ref: Option<String>,
150    /// Platform-specific metadata for session tag construction.
151    pub routing_metadata: HashMap<String, String>,
152}
153
154/// Attachment from an inbound platform message.
155#[derive(Debug, Clone)]
156pub enum InboundAttachment {
157    /// Image with a fetchable URL.
158    Image {
159        url: String,
160        alt_text: Option<String>,
161    },
162    /// Non-image file described as text.
163    FileDescription {
164        name: String,
165        mime_type: Option<String>,
166    },
167}
168
169// ============================================
170// Outbound channel messages
171// ============================================
172
173/// A platform-agnostic outbound message to deliver to a channel.
174///
175/// The delivery adapter translates this into platform-specific API calls
176/// (e.g. Slack chat.postMessage, Discord channel message create).
177#[derive(Debug, Clone)]
178pub struct OutboundChannelMessage {
179    /// The session this message belongs to.
180    pub session_id: SessionId,
181    /// Text content to deliver.
182    pub text: String,
183    /// Thread reference for reply targeting.
184    pub thread_ref: String,
185    /// Whether this is a progress report (vs. a final answer).
186    pub is_progress_report: bool,
187}
188
189// ============================================
190// Channel delivery adapter (async agent responses)
191// ============================================
192
193/// Reply mode for channel delivery — controls which agent output reaches the channel.
194///
195/// Generalizes SlackReplyMode to work across all platforms.
196#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq, Default)]
197#[serde(rename_all = "snake_case")]
198pub enum ChannelReplyMode {
199    /// Forward all completed assistant messages to the channel.
200    #[default]
201    AllMessages,
202    /// Only deliver explicit report_progress tool outputs.
203    ReportProgressOnly,
204}
205
206/// Trait for platform-specific delivery of agent responses.
207///
208/// Implementations handle the "last mile" of posting messages back to
209/// Slack, Discord, Teams, etc. The generic delivery dispatcher calls
210/// these methods; platform adapters implement them.
211///
212/// Lifecycle:
213/// 1. Webhook arrives → adapter parses InboundChannelEvent
214/// 2. Server routes to session, creates input.message, triggers agent
215/// 3. Core delivery dispatcher records a pending delivery for this session/turn
216/// 4. Agent runs asynchronously (seconds to hours)
217/// 5. When agent output is ready, the delivery dispatcher calls `deliver()`
218/// 6. When the turn completes or is cancelled, the dispatcher clears the pending delivery
219#[async_trait]
220pub trait ChannelDeliveryAdapter: Send + Sync {
221    /// Platform identifier (e.g. "slack", "discord").
222    fn platform(&self) -> &str;
223
224    /// Deliver a message to the platform channel.
225    ///
226    /// Called by the generic delivery dispatcher when agent output is ready.
227    /// Implementations should handle retries internally for transient failures.
228    async fn deliver(
229        &self,
230        message: &OutboundChannelMessage,
231        context: &DeliveryContext,
232    ) -> DeliveryResult;
233
234    /// Send an immediate acknowledgement to the channel.
235    ///
236    /// Called right after webhook ingestion for async agent invocations.
237    /// e.g. Slack's "On it." message in report_progress_only mode.
238    /// Platforms that don't need an ack can return Ok(()).
239    async fn send_ack(
240        &self,
241        thread_ref: &str,
242        text: &str,
243        context: &DeliveryContext,
244    ) -> DeliveryResult;
245
246    /// Format a progress report for this platform.
247    ///
248    /// Different platforms have different formatting (Slack mrkdwn, Discord markdown, etc.)
249    fn format_progress_report(
250        &self,
251        report: &crate::progress_reporting::ProgressReportPayload,
252    ) -> String;
253}
254
255/// Context needed by a delivery adapter to post messages.
256///
257/// Stored when a delivery is registered, consumed when events arrive.
258/// Platform adapters extend this with platform-specific fields via `extra`.
259#[derive(Clone)]
260pub struct DeliveryContext {
261    /// Bot/app authentication token for the platform API.
262    pub auth_token: String,
263    /// Platform-specific channel/conversation ID.
264    pub channel_id: String,
265    /// Thread reference for reply targeting.
266    pub thread_ref: String,
267    /// Reply mode controlling which output is delivered.
268    pub reply_mode: ChannelReplyMode,
269    /// Platform-specific extra context (e.g. team_id, workspace URL).
270    pub extra: HashMap<String, String>,
271}
272
273impl std::fmt::Debug for DeliveryContext {
274    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
275        f.debug_struct("DeliveryContext")
276            .field("auth_token", &"[REDACTED]")
277            .field("channel_id", &self.channel_id)
278            .field("thread_ref", &self.thread_ref)
279            .field("reply_mode", &self.reply_mode)
280            .field("extra", &self.extra)
281            .finish()
282    }
283}
284
285/// Result of a delivery attempt.
286#[derive(Debug)]
287pub enum DeliveryResult {
288    /// Message delivered successfully.
289    Ok,
290    /// Transient failure — caller should retry with backoff.
291    TransientError(String),
292    /// Permanent failure — do not retry (e.g. invalid token, channel deleted).
293    PermanentError(String),
294}
295
296// ============================================
297// Session strategy (generalized from Slack)
298// ============================================
299
300/// Channel-agnostic session routing tag builder.
301///
302/// Given platform metadata from an InboundChannelEvent, produces the
303/// session tags used to find or create the correct session.
304pub fn build_session_routing_tag(
305    platform: &str,
306    strategy: &SessionRoutingStrategy,
307    metadata: &HashMap<String, String>,
308) -> Option<String> {
309    match strategy {
310        SessionRoutingStrategy::PerThread => metadata
311            .get("thread_ref")
312            .map(|t| format!("{}:thread:{}", platform, t)),
313        SessionRoutingStrategy::PerChannel => metadata
314            .get("channel_id")
315            .map(|c| format!("{}:channel:{}", platform, c)),
316        SessionRoutingStrategy::PerUser => metadata
317            .get("user_id")
318            .map(|u| format!("{}:user:{}", platform, u)),
319    }
320}
321
322/// How incoming messages map to sessions — generalized from Slack's SessionStrategy.
323#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq, Default)]
324#[serde(rename_all = "snake_case")]
325pub enum SessionRoutingStrategy {
326    /// Each thread gets its own session (default).
327    #[default]
328    PerThread,
329    /// One session per channel/conversation.
330    PerChannel,
331    /// One session per user across all threads.
332    PerUser,
333}
334
335// TODO(platform-tools): Channel adapters should optionally contribute
336// platform-specific tools via the Capability trait. Examples:
337// - Slack: add_reaction, post_to_channel, create_thread, upload_file
338// - Discord: create_thread, add_reaction, pin_message
339// - Teams: send_adaptive_card, create_tab
340// The plumbing exists (Capability::tools()), but no adapter uses it yet.
341// When implementing, tools should receive platform context via ToolContext
342// (which already has session access for looking up channel config).
343
344#[cfg(test)]
345mod tests {
346    use super::*;
347
348    #[test]
349    fn test_thread_context_track_participant() {
350        let mut ctx = ThreadContext::new("1234.5678", "slack");
351        let actor = ExternalActor {
352            actor_id: "U001".into(),
353            actor_name: Some("Alice".into()),
354            source: "slack".into(),
355            metadata: None,
356        };
357
358        assert!(ctx.track_participant(&actor));
359        assert!(!ctx.track_participant(&actor)); // second time is not new
360        assert_eq!(ctx.participant_count(), 1);
361    }
362
363    #[test]
364    fn test_thread_context_updates_display_name() {
365        let mut ctx = ThreadContext::new("thread_1", "slack");
366        let actor_v1 = ExternalActor {
367            actor_id: "U001".into(),
368            actor_name: Some("Alice".into()),
369            source: "slack".into(),
370            metadata: None,
371        };
372        let actor_v2 = ExternalActor {
373            actor_id: "U001".into(),
374            actor_name: Some("Alice B.".into()),
375            source: "slack".into(),
376            metadata: None,
377        };
378
379        ctx.track_participant(&actor_v1);
380        ctx.track_participant(&actor_v2);
381
382        let p = ctx.participants.get("U001").unwrap();
383        assert_eq!(p.actor.actor_name.as_deref(), Some("Alice B."));
384    }
385
386    #[test]
387    fn test_thread_context_participants_summary() {
388        let mut ctx = ThreadContext::new("thread_1", "discord");
389        assert_eq!(ctx.participants_summary(), "");
390
391        ctx.track_participant(&ExternalActor {
392            actor_id: "U001".into(),
393            actor_name: Some("Alice".into()),
394            source: "discord".into(),
395            metadata: None,
396        });
397        ctx.track_participant(&ExternalActor {
398            actor_id: "U002".into(),
399            actor_name: None,
400            source: "discord".into(),
401            metadata: None,
402        });
403
404        let summary = ctx.participants_summary();
405        assert!(summary.contains("Alice"));
406        assert!(summary.contains("U002")); // fallback to actor_id
407    }
408
409    #[test]
410    fn test_build_session_routing_tag_per_thread() {
411        let mut meta = HashMap::new();
412        meta.insert("thread_ref".into(), "1234.5678".into());
413        let tag = build_session_routing_tag("slack", &SessionRoutingStrategy::PerThread, &meta);
414        assert_eq!(tag, Some("slack:thread:1234.5678".into()));
415    }
416
417    #[test]
418    fn test_build_session_routing_tag_per_channel() {
419        let mut meta = HashMap::new();
420        meta.insert("channel_id".into(), "C0123".into());
421        let tag = build_session_routing_tag("discord", &SessionRoutingStrategy::PerChannel, &meta);
422        assert_eq!(tag, Some("discord:channel:C0123".into()));
423    }
424
425    #[test]
426    fn test_build_session_routing_tag_per_user() {
427        let mut meta = HashMap::new();
428        meta.insert("user_id".into(), "U999".into());
429        let tag = build_session_routing_tag("teams", &SessionRoutingStrategy::PerUser, &meta);
430        assert_eq!(tag, Some("teams:user:U999".into()));
431    }
432
433    #[test]
434    fn test_build_session_routing_tag_missing_metadata() {
435        let meta = HashMap::new();
436        let tag = build_session_routing_tag("slack", &SessionRoutingStrategy::PerThread, &meta);
437        assert_eq!(tag, None);
438    }
439
440    #[test]
441    fn test_channel_reply_mode_default() {
442        assert_eq!(ChannelReplyMode::default(), ChannelReplyMode::AllMessages);
443    }
444
445    #[test]
446    fn test_channel_reply_mode_serde_roundtrip() {
447        let json = serde_json::to_string(&ChannelReplyMode::ReportProgressOnly).unwrap();
448        assert_eq!(json, r#""report_progress_only""#);
449        let parsed: ChannelReplyMode = serde_json::from_str(&json).unwrap();
450        assert_eq!(parsed, ChannelReplyMode::ReportProgressOnly);
451    }
452
453    #[test]
454    fn test_session_routing_strategy_default() {
455        assert_eq!(
456            SessionRoutingStrategy::default(),
457            SessionRoutingStrategy::PerThread
458        );
459    }
460
461    #[test]
462    fn test_session_routing_strategy_serde() {
463        let json = serde_json::to_string(&SessionRoutingStrategy::PerChannel).unwrap();
464        assert_eq!(json, r#""per_channel""#);
465        let parsed: SessionRoutingStrategy = serde_json::from_str(&json).unwrap();
466        assert_eq!(parsed, SessionRoutingStrategy::PerChannel);
467    }
468}