Skip to main content

relaycast/
events.rs

1//! Helpers for consuming raw RelayCast WebSocket events.
2
3use std::{
4    collections::hash_map::DefaultHasher,
5    hash::{Hash, Hasher},
6};
7
8use serde::{Deserialize, Serialize};
9use serde_json::{Map, Value};
10
11/// Normalized inbound event kind for message-like WebSocket events.
12#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
13#[serde(rename_all = "snake_case")]
14pub enum NormalizedEventKind {
15    MessageCreated,
16    DmReceived,
17    ThreadReply,
18    GroupDmReceived,
19    Presence,
20    ReactionReceived,
21}
22
23/// Normalized sender kind extracted from event metadata.
24#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
25#[serde(rename_all = "snake_case")]
26pub enum SenderKind {
27    Human,
28    Agent,
29    Unknown,
30}
31
32/// Suggested delivery priority for a normalized inbound event.
33#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
34#[serde(rename_all = "UPPERCASE")]
35pub enum RelayPriority {
36    P2,
37    P3,
38    P4,
39}
40
41/// A stable, SDK-owned shape for inbound WebSocket events that need routing.
42#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
43pub struct NormalizedInboundEvent {
44    pub event_id: String,
45    pub kind: NormalizedEventKind,
46    pub from: String,
47    #[serde(skip_serializing_if = "Option::is_none")]
48    pub sender_agent_id: Option<String>,
49    pub sender_kind: SenderKind,
50    pub target: String,
51    pub text: String,
52    #[serde(skip_serializing_if = "Option::is_none")]
53    pub thread_id: Option<String>,
54    pub priority: RelayPriority,
55}
56
57/// A normalized command invocation event.
58#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
59pub struct NormalizedCommandInvocation {
60    pub command: String,
61    pub channel: String,
62    pub invoked_by: String,
63    #[serde(skip_serializing_if = "Option::is_none")]
64    pub handler_agent_id: Option<String>,
65    #[serde(skip_serializing_if = "Option::is_none")]
66    pub args: Option<String>,
67    #[serde(skip_serializing_if = "Option::is_none")]
68    pub parameters: Option<Map<String, Value>>,
69}
70
71/// Normalize a raw WebSocket event value into a routeable message-like event.
72///
73/// The parser accepts current top-level event fields and payload-wrapped event
74/// shapes. It intentionally returns `None` for command events; use
75/// [`normalize_command_invocation`] for `command.invoked`.
76pub fn normalize_inbound_event(value: &Value) -> Option<NormalizedInboundEvent> {
77    let accessor = EventAccessor::new(value);
78    let event_type = accessor
79        .field(EventNesting::Top, "type")
80        .and_then(Value::as_str)?;
81    let mut kind = parse_inbound_kind(event_type)?;
82
83    if matches!(kind, NormalizedEventKind::MessageCreated)
84        && extract_channel(accessor).is_none()
85        && has_conversation_context(accessor)
86    {
87        kind = NormalizedEventKind::DmReceived;
88    }
89
90    if matches!(
91        kind,
92        NormalizedEventKind::MessageCreated
93            | NormalizedEventKind::DmReceived
94            | NormalizedEventKind::ThreadReply
95            | NormalizedEventKind::GroupDmReceived
96    ) {
97        let has_message = accessor
98            .nested(EventNesting::Message)
99            .is_some_and(Value::is_object)
100            || accessor
101                .nested(EventNesting::PayloadMessage)
102                .is_some_and(Value::is_object);
103        if !has_message && extract_text(accessor).is_none() {
104            return None;
105        }
106    }
107
108    if matches!(kind, NormalizedEventKind::Presence) {
109        let from = extract_presence_sender(accessor).unwrap_or_else(|| "unknown".to_string());
110        return Some(NormalizedInboundEvent {
111            event_id: format!("presence-{event_type}-{from}"),
112            kind,
113            from,
114            sender_agent_id: None,
115            sender_kind: SenderKind::Agent,
116            target: String::new(),
117            text: String::new(),
118            thread_id: None,
119            priority: RelayPriority::P4,
120        });
121    }
122
123    if matches!(kind, NormalizedEventKind::ReactionReceived) {
124        return normalize_reaction(accessor, event_type, kind);
125    }
126
127    let from = extract_sender(accessor).unwrap_or_else(|| "unknown".to_string());
128    let sender_agent_id = extract_sender_agent_id(accessor);
129    let sender_kind = parse_sender_kind(accessor);
130    let target = extract_target(accessor, kind).unwrap_or_else(|| "unknown".to_string());
131    let text = extract_text(accessor).unwrap_or_default();
132    let thread_id = extract_thread_id(accessor);
133    let event_id = extract_event_id(accessor)
134        .unwrap_or_else(|| synth_event_id(event_type, &from, &target, &text, thread_id.as_deref()));
135    let priority = match kind {
136        NormalizedEventKind::DmReceived => RelayPriority::P2,
137        NormalizedEventKind::MessageCreated
138        | NormalizedEventKind::ThreadReply
139        | NormalizedEventKind::GroupDmReceived => RelayPriority::P3,
140        NormalizedEventKind::Presence | NormalizedEventKind::ReactionReceived => RelayPriority::P4,
141    };
142
143    Some(NormalizedInboundEvent {
144        event_id,
145        kind,
146        from,
147        sender_agent_id,
148        sender_kind,
149        target,
150        text,
151        thread_id,
152        priority,
153    })
154}
155
156/// Normalize a raw `command.invoked` WebSocket event.
157pub fn normalize_command_invocation(value: &Value) -> Option<NormalizedCommandInvocation> {
158    let event_type = value.get("type")?.as_str()?;
159    if event_type != "command.invoked" {
160        return None;
161    }
162
163    Some(NormalizedCommandInvocation {
164        command: value.get("command")?.as_str()?.to_string(),
165        channel: value
166            .get("channel")
167            .and_then(scalar_to_string)
168            .unwrap_or_default(),
169        invoked_by: value
170            .get("invoked_by")
171            .and_then(scalar_to_string)
172            .unwrap_or_else(|| "unknown".to_string()),
173        handler_agent_id: value
174            .get("handler_agent_id")
175            .and_then(scalar_to_string)
176            .or_else(|| {
177                value
178                    .get("handler")
179                    .and_then(|handler| handler.get("id"))
180                    .and_then(scalar_to_string)
181            }),
182        args: value.get("args").and_then(scalar_to_string),
183        parameters: value.get("parameters").and_then(Value::as_object).cloned(),
184    })
185}
186
187fn normalize_reaction(
188    accessor: EventAccessor<'_>,
189    _event_type: &str,
190    kind: NormalizedEventKind,
191) -> Option<NormalizedInboundEvent> {
192    let from = accessor
193        .field(EventNesting::Top, "agent_name")
194        .and_then(scalar_to_string)
195        .unwrap_or_else(|| "unknown".to_string());
196    let emoji = accessor
197        .field(EventNesting::Top, "emoji")
198        .and_then(scalar_to_string)
199        .unwrap_or_else(|| "?".to_string());
200    let message_id = accessor
201        .field(EventNesting::Top, "message_id")
202        .and_then(scalar_to_string)
203        .unwrap_or_default();
204    let channel_name = accessor
205        .field(EventNesting::Top, "channel_name")
206        .and_then(scalar_to_string);
207    let target = match channel_name {
208        Some(channel) if channel.starts_with('#') => channel,
209        Some(channel) if !channel.is_empty() => format!("#{channel}"),
210        _ => return None,
211    };
212
213    Some(NormalizedInboundEvent {
214        event_id: format!("reaction-{message_id}-{from}-{emoji}"),
215        kind,
216        from: from.clone(),
217        sender_agent_id: None,
218        sender_kind: SenderKind::Agent,
219        target,
220        text: format!(
221            ":{emoji}: reaction from {from} on message {message_id} (informational; no response required)"
222        ),
223        thread_id: None,
224        priority: RelayPriority::P4,
225    })
226}
227
228#[derive(Clone, Copy)]
229enum EventNesting {
230    Top,
231    Message,
232    Payload,
233    PayloadMessage,
234}
235
236#[derive(Clone, Copy)]
237struct EventAccessor<'a> {
238    top: &'a Value,
239    message: Option<&'a Value>,
240    payload: Option<&'a Value>,
241    payload_message: Option<&'a Value>,
242}
243
244impl<'a> EventAccessor<'a> {
245    fn new(top: &'a Value) -> Self {
246        let payload = top.get("payload");
247        let message = top.get("message");
248        let payload_message = payload.and_then(|nested| nested.get("message"));
249
250        Self {
251            top,
252            message,
253            payload,
254            payload_message,
255        }
256    }
257
258    fn nested(self, nesting: EventNesting) -> Option<&'a Value> {
259        match nesting {
260            EventNesting::Top => Some(self.top),
261            EventNesting::Message => self.message,
262            EventNesting::Payload => self.payload,
263            EventNesting::PayloadMessage => self.payload_message,
264        }
265    }
266
267    fn field(self, nesting: EventNesting, key: &str) -> Option<&'a Value> {
268        self.nested(nesting)?.get(key)
269    }
270
271    fn agent_name(self, nesting: EventNesting) -> Option<&'a Value> {
272        self.field(nesting, "agent")
273            .and_then(|agent| agent.get("name"))
274    }
275
276    fn first_string<F>(
277        self,
278        candidates: &[(EventNesting, &str)],
279        mut convert: F,
280        require_non_empty: bool,
281    ) -> Option<String>
282    where
283        F: FnMut(&Value) -> Option<String>,
284    {
285        for (nesting, key) in candidates {
286            if let Some(value) = self.field(*nesting, key).and_then(&mut convert) {
287                if !require_non_empty || !value.is_empty() {
288                    return Some(value);
289                }
290            }
291        }
292        None
293    }
294
295    fn first_agent_name(self, nestings: &[EventNesting]) -> Option<String> {
296        for nesting in nestings {
297            if let Some(name) = self.agent_name(*nesting).and_then(scalar_to_string) {
298                if !name.is_empty() {
299                    return Some(name);
300                }
301            }
302        }
303        None
304    }
305
306    fn has_trimmed_non_empty_scalar(self, candidates: &[(EventNesting, &str)]) -> bool {
307        candidates.iter().any(|(nesting, key)| {
308            self.field(*nesting, key)
309                .and_then(Value::as_str)
310                .is_some_and(|value| !value.trim().is_empty())
311        })
312    }
313}
314
315fn parse_inbound_kind(event_type: &str) -> Option<NormalizedEventKind> {
316    match event_type {
317        "message.created" | "message.received" | "message.new" | "message.sent"
318        | "message.delivered" => Some(NormalizedEventKind::MessageCreated),
319        "dm.received"
320        | "dm.created"
321        | "dm.new"
322        | "dm.sent"
323        | "dm.message.created"
324        | "direct_message.received"
325        | "direct_message.created"
326        | "direct_message.new"
327        | "direct_message.sent" => Some(NormalizedEventKind::DmReceived),
328        "thread.reply" | "thread.message.created" | "thread.message.sent" => {
329            Some(NormalizedEventKind::ThreadReply)
330        }
331        "group_dm.received"
332        | "group_dm.created"
333        | "group_dm.new"
334        | "group_dm.sent"
335        | "group_dm.message.created" => Some(NormalizedEventKind::GroupDmReceived),
336        "agent.status.changed"
337        | "agent.status.idle"
338        | "agent.status.active"
339        | "agent.status.blocked"
340        | "agent.status.waiting"
341        | "agent.status.offline"
342        | "user.online"
343        | "user.offline" => Some(NormalizedEventKind::Presence),
344        "message.reacted" => Some(NormalizedEventKind::ReactionReceived),
345        _ => None,
346    }
347}
348
349fn extract_presence_sender(accessor: EventAccessor<'_>) -> Option<String> {
350    const AGENT_NAME_NESTINGS: [EventNesting; 2] = [EventNesting::Top, EventNesting::Payload];
351    const AGENT_NAME_FIELDS: [(EventNesting, &str); 2] = [
352        (EventNesting::Top, "agent_name"),
353        (EventNesting::Payload, "agent_name"),
354    ];
355    const FROM_FIELDS: [(EventNesting, &str); 2] =
356        [(EventNesting::Top, "from"), (EventNesting::Payload, "from")];
357
358    accessor
359        .first_agent_name(&AGENT_NAME_NESTINGS)
360        .or_else(|| accessor.first_string(&AGENT_NAME_FIELDS, scalar_to_string, true))
361        .or_else(|| accessor.first_string(&FROM_FIELDS, scalar_to_string, true))
362}
363
364fn extract_event_id(accessor: EventAccessor<'_>) -> Option<String> {
365    const EVENT_ID_FIELDS: [(EventNesting, &str); 12] = [
366        (EventNesting::Top, "event_id"),
367        (EventNesting::Top, "message_id"),
368        (EventNesting::Top, "id"),
369        (EventNesting::Message, "event_id"),
370        (EventNesting::Message, "message_id"),
371        (EventNesting::Message, "id"),
372        (EventNesting::Payload, "event_id"),
373        (EventNesting::Payload, "message_id"),
374        (EventNesting::Payload, "id"),
375        (EventNesting::PayloadMessage, "event_id"),
376        (EventNesting::PayloadMessage, "message_id"),
377        (EventNesting::PayloadMessage, "id"),
378    ];
379
380    accessor.first_string(&EVENT_ID_FIELDS, scalar_to_string, true)
381}
382
383fn extract_sender_agent_id(accessor: EventAccessor<'_>) -> Option<String> {
384    const FIELDS: [(EventNesting, &str); 4] = [
385        (EventNesting::Message, "agent_id"),
386        (EventNesting::PayloadMessage, "agent_id"),
387        (EventNesting::Top, "agent_id"),
388        (EventNesting::Payload, "agent_id"),
389    ];
390
391    accessor.first_string(&FIELDS, scalar_to_string, true)
392}
393
394fn extract_sender(accessor: EventAccessor<'_>) -> Option<String> {
395    const TOP_AGENT_NESTINGS: [EventNesting; 1] = [EventNesting::Top];
396    const TOP_FIELDS: [(EventNesting, &str); 6] = [
397        (EventNesting::Top, "from"),
398        (EventNesting::Top, "sender"),
399        (EventNesting::Top, "author"),
400        (EventNesting::Top, "from_agent"),
401        (EventNesting::Top, "agent"),
402        (EventNesting::Top, "agent_name"),
403    ];
404    const MESSAGE_FIELDS: [(EventNesting, &str); 6] = [
405        (EventNesting::Message, "from"),
406        (EventNesting::Message, "sender"),
407        (EventNesting::Message, "author"),
408        (EventNesting::Message, "from_agent"),
409        (EventNesting::Message, "agent"),
410        (EventNesting::Message, "agent_name"),
411    ];
412    const PAYLOAD_AGENT_NESTINGS: [EventNesting; 1] = [EventNesting::Payload];
413    const PAYLOAD_FIELDS: [(EventNesting, &str); 6] = [
414        (EventNesting::Payload, "from"),
415        (EventNesting::Payload, "sender"),
416        (EventNesting::Payload, "author"),
417        (EventNesting::Payload, "from_agent"),
418        (EventNesting::Payload, "agent"),
419        (EventNesting::Payload, "agent_name"),
420    ];
421    const PAYLOAD_MESSAGE_FIELDS: [(EventNesting, &str); 6] = [
422        (EventNesting::PayloadMessage, "from"),
423        (EventNesting::PayloadMessage, "sender"),
424        (EventNesting::PayloadMessage, "author"),
425        (EventNesting::PayloadMessage, "from_agent"),
426        (EventNesting::PayloadMessage, "agent"),
427        (EventNesting::PayloadMessage, "agent_name"),
428    ];
429
430    let raw = accessor
431        .first_agent_name(&TOP_AGENT_NESTINGS)
432        .or_else(|| accessor.first_string(&TOP_FIELDS, sender_value_to_string, true))
433        .or_else(|| accessor.first_string(&MESSAGE_FIELDS, sender_value_to_string, true))
434        .or_else(|| accessor.first_agent_name(&PAYLOAD_AGENT_NESTINGS))
435        .or_else(|| accessor.first_string(&PAYLOAD_FIELDS, sender_value_to_string, true))
436        .or_else(|| accessor.first_string(&PAYLOAD_MESSAGE_FIELDS, sender_value_to_string, true))?;
437
438    Some(normalize_sender_identity(&raw))
439}
440
441/// Normalize relay infrastructure sender identities to a stable display name.
442pub fn normalize_sender_identity(raw: &str) -> String {
443    if raw == "broker" || raw.starts_with("broker-") || raw.starts_with("human:") {
444        return "Dashboard".to_string();
445    }
446    raw.to_string()
447}
448
449fn extract_target(accessor: EventAccessor<'_>, kind: NormalizedEventKind) -> Option<String> {
450    const EXPLICIT_TARGET_FIELDS: [(EventNesting, &str); 20] = [
451        (EventNesting::Top, "target"),
452        (EventNesting::Top, "to"),
453        (EventNesting::Top, "recipient"),
454        (EventNesting::Top, "to_agent"),
455        (EventNesting::Top, "recipient_agent"),
456        (EventNesting::Message, "target"),
457        (EventNesting::Message, "to"),
458        (EventNesting::Message, "recipient"),
459        (EventNesting::Message, "to_agent"),
460        (EventNesting::Message, "recipient_agent"),
461        (EventNesting::Payload, "target"),
462        (EventNesting::Payload, "to"),
463        (EventNesting::Payload, "recipient"),
464        (EventNesting::Payload, "to_agent"),
465        (EventNesting::Payload, "recipient_agent"),
466        (EventNesting::PayloadMessage, "target"),
467        (EventNesting::PayloadMessage, "to"),
468        (EventNesting::PayloadMessage, "recipient"),
469        (EventNesting::PayloadMessage, "to_agent"),
470        (EventNesting::PayloadMessage, "recipient_agent"),
471    ];
472    const CONVERSATION_DM_FIELDS: [(EventNesting, &str); 2] = [
473        (EventNesting::Top, "conversation_id"),
474        (EventNesting::Payload, "conversation_id"),
475    ];
476    const CONVERSATION_FIELDS: [(EventNesting, &str); 4] = [
477        (EventNesting::Top, "conversation_id"),
478        (EventNesting::Message, "conversation_id"),
479        (EventNesting::Payload, "conversation_id"),
480        (EventNesting::PayloadMessage, "conversation_id"),
481    ];
482
483    if matches!(
484        kind,
485        NormalizedEventKind::DmReceived | NormalizedEventKind::GroupDmReceived
486    ) {
487        if let Some(target) =
488            accessor.first_string(&EXPLICIT_TARGET_FIELDS, sender_value_to_string, true)
489        {
490            return Some(target);
491        }
492        if let Some(target) = accessor.first_string(&CONVERSATION_DM_FIELDS, scalar_to_string, true)
493        {
494            return Some(target);
495        }
496    }
497
498    if let Some(channel) = extract_channel(accessor) {
499        return Some(channel);
500    }
501
502    if let Some(target) =
503        accessor.first_string(&EXPLICIT_TARGET_FIELDS, sender_value_to_string, true)
504    {
505        return Some(target);
506    }
507
508    if let Some(target) = accessor.first_string(&CONVERSATION_FIELDS, scalar_to_string, true) {
509        return Some(target);
510    }
511
512    if matches!(kind, NormalizedEventKind::ThreadReply) {
513        return Some("thread".to_string());
514    }
515
516    None
517}
518
519fn has_conversation_context(accessor: EventAccessor<'_>) -> bool {
520    const CONVERSATION_FIELDS: [(EventNesting, &str); 4] = [
521        (EventNesting::Top, "conversation_id"),
522        (EventNesting::Message, "conversation_id"),
523        (EventNesting::Payload, "conversation_id"),
524        (EventNesting::PayloadMessage, "conversation_id"),
525    ];
526
527    accessor.has_trimmed_non_empty_scalar(&CONVERSATION_FIELDS)
528}
529
530fn synth_event_id(
531    event_type: &str,
532    from: &str,
533    target: &str,
534    text: &str,
535    thread_id: Option<&str>,
536) -> String {
537    let mut hasher = DefaultHasher::new();
538    event_type.hash(&mut hasher);
539    from.hash(&mut hasher);
540    target.hash(&mut hasher);
541    text.hash(&mut hasher);
542    thread_id.unwrap_or_default().hash(&mut hasher);
543    format!("synthetic-{event_type}-{:016x}", hasher.finish())
544}
545
546fn extract_channel(accessor: EventAccessor<'_>) -> Option<String> {
547    const CHANNEL_FIELDS: [(EventNesting, &str); 4] = [
548        (EventNesting::Top, "channel"),
549        (EventNesting::Message, "channel"),
550        (EventNesting::Payload, "channel"),
551        (EventNesting::PayloadMessage, "channel"),
552    ];
553
554    for (nesting, key) in CHANNEL_FIELDS {
555        if let Some(raw) = accessor.field(nesting, key).and_then(scalar_to_string) {
556            if raw.is_empty() {
557                continue;
558            }
559            if raw.starts_with('#') {
560                return Some(raw);
561            }
562            return Some(format!("#{raw}"));
563        }
564    }
565    None
566}
567
568fn extract_text(accessor: EventAccessor<'_>) -> Option<String> {
569    const TEXT_FIELDS: [(EventNesting, &str); 12] = [
570        (EventNesting::Top, "text"),
571        (EventNesting::Top, "body"),
572        (EventNesting::Top, "content"),
573        (EventNesting::Message, "text"),
574        (EventNesting::Message, "body"),
575        (EventNesting::Message, "content"),
576        (EventNesting::Payload, "text"),
577        (EventNesting::Payload, "body"),
578        (EventNesting::Payload, "content"),
579        (EventNesting::PayloadMessage, "text"),
580        (EventNesting::PayloadMessage, "body"),
581        (EventNesting::PayloadMessage, "content"),
582    ];
583
584    if let Some(text) = accessor.first_string(&TEXT_FIELDS, scalar_to_string, false) {
585        return Some(text);
586    }
587
588    if let Some(raw_message) = accessor
589        .field(EventNesting::Top, "message")
590        .and_then(Value::as_str)
591    {
592        return Some(raw_message.to_string());
593    }
594    if let Some(raw_message) = accessor
595        .field(EventNesting::Payload, "message")
596        .and_then(Value::as_str)
597    {
598        return Some(raw_message.to_string());
599    }
600
601    None
602}
603
604fn extract_thread_id(accessor: EventAccessor<'_>) -> Option<String> {
605    const THREAD_FIELDS: [(EventNesting, &str); 6] = [
606        (EventNesting::Top, "parent_id"),
607        (EventNesting::Top, "thread_id"),
608        (EventNesting::Message, "thread_id"),
609        (EventNesting::Payload, "parent_id"),
610        (EventNesting::Payload, "thread_id"),
611        (EventNesting::PayloadMessage, "thread_id"),
612    ];
613
614    accessor.first_string(&THREAD_FIELDS, scalar_to_string, true)
615}
616
617fn sender_value_to_string(value: &Value) -> Option<String> {
618    if let Some(s) = scalar_to_string(value) {
619        return Some(s);
620    }
621
622    let obj = value.as_object()?;
623    for key in ["name", "display_name", "username", "handle", "id"] {
624        if let Some(v) = obj.get(key) {
625            if let Some(s) = scalar_to_string(v) {
626                if !s.is_empty() {
627                    return Some(s);
628                }
629            }
630        }
631    }
632    None
633}
634
635fn scalar_to_string(value: &Value) -> Option<String> {
636    match value {
637        Value::String(s) => Some(s.clone()),
638        Value::Number(n) => Some(n.to_string()),
639        _ => None,
640    }
641}
642
643fn parse_sender_kind(accessor: EventAccessor<'_>) -> SenderKind {
644    const KIND_FIELDS: [(EventNesting, &str); 24] = [
645        (EventNesting::Top, "from_type"),
646        (EventNesting::Top, "sender_type"),
647        (EventNesting::Top, "actor_type"),
648        (EventNesting::Top, "source_type"),
649        (EventNesting::Top, "origin_type"),
650        (EventNesting::Top, "sender_kind"),
651        (EventNesting::Message, "from_type"),
652        (EventNesting::Message, "sender_type"),
653        (EventNesting::Message, "actor_type"),
654        (EventNesting::Message, "source_type"),
655        (EventNesting::Message, "origin_type"),
656        (EventNesting::Message, "sender_kind"),
657        (EventNesting::Payload, "from_type"),
658        (EventNesting::Payload, "sender_type"),
659        (EventNesting::Payload, "actor_type"),
660        (EventNesting::Payload, "source_type"),
661        (EventNesting::Payload, "origin_type"),
662        (EventNesting::Payload, "sender_kind"),
663        (EventNesting::PayloadMessage, "from_type"),
664        (EventNesting::PayloadMessage, "sender_type"),
665        (EventNesting::PayloadMessage, "actor_type"),
666        (EventNesting::PayloadMessage, "source_type"),
667        (EventNesting::PayloadMessage, "origin_type"),
668        (EventNesting::PayloadMessage, "sender_kind"),
669    ];
670    const CONTAINER_NESTINGS: [EventNesting; 4] = [
671        EventNesting::Top,
672        EventNesting::Message,
673        EventNesting::Payload,
674        EventNesting::PayloadMessage,
675    ];
676
677    for (nesting, key) in KIND_FIELDS {
678        if let Some(kind) = accessor
679            .field(nesting, key)
680            .and_then(Value::as_str)
681            .and_then(parse_sender_kind_label)
682        {
683            return kind;
684        }
685    }
686
687    for nesting in CONTAINER_NESTINGS {
688        if let Some(kind) = accessor
689            .nested(nesting)
690            .and_then(Value::as_object)
691            .and_then(parse_sender_kind_from_containers)
692        {
693            return kind;
694        }
695    }
696
697    SenderKind::Unknown
698}
699
700fn parse_sender_kind_label(raw: &str) -> Option<SenderKind> {
701    match raw.trim().to_ascii_lowercase().as_str() {
702        "human" | "user" => Some(SenderKind::Human),
703        "agent" | "bot" | "assistant" => Some(SenderKind::Agent),
704        _ => None,
705    }
706}
707
708fn parse_sender_kind_from_containers(payload: &Map<String, Value>) -> Option<SenderKind> {
709    for container in ["from", "sender", "author"] {
710        if let Some(kind) = payload
711            .get(container)
712            .and_then(Value::as_object)
713            .and_then(|obj| {
714                obj.get("type")
715                    .or_else(|| obj.get("kind"))
716                    .or_else(|| obj.get("role"))
717            })
718            .and_then(Value::as_str)
719            .and_then(parse_sender_kind_label)
720        {
721            return Some(kind);
722        }
723    }
724    None
725}
726
727#[cfg(test)]
728mod tests {
729    use serde_json::json;
730
731    use super::{
732        normalize_command_invocation, normalize_inbound_event, normalize_sender_identity,
733        NormalizedEventKind, RelayPriority, SenderKind,
734    };
735
736    #[test]
737    fn normalizes_message_created_top_level() {
738        let event = normalize_inbound_event(&json!({
739            "type": "message.created",
740            "channel": "general",
741            "message": {
742                "id": "msg_1",
743                "agent_id": "agent_1",
744                "agent_name": "alice",
745                "text": "hello"
746            }
747        }))
748        .expect("message event should normalize");
749
750        assert_eq!(event.kind, NormalizedEventKind::MessageCreated);
751        assert_eq!(event.event_id, "msg_1");
752        assert_eq!(event.from, "alice");
753        assert_eq!(event.sender_agent_id.as_deref(), Some("agent_1"));
754        assert_eq!(event.target, "#general");
755        assert_eq!(event.text, "hello");
756        assert_eq!(event.priority, RelayPriority::P3);
757    }
758
759    #[test]
760    fn normalizes_payload_wrapped_dm() {
761        let event = normalize_inbound_event(&json!({
762            "type": "dm.received",
763            "payload": {
764                "conversation_id": "dm_1",
765                "message": {
766                    "id": "msg_2",
767                    "agent_name": "broker",
768                    "text": "ping"
769                }
770            }
771        }))
772        .expect("payload-wrapped dm should normalize");
773
774        assert_eq!(event.kind, NormalizedEventKind::DmReceived);
775        assert_eq!(event.from, "Dashboard");
776        assert_eq!(event.target, "dm_1");
777        assert_eq!(event.text, "ping");
778        assert_eq!(event.priority, RelayPriority::P2);
779    }
780
781    #[test]
782    fn treats_message_created_with_conversation_as_dm() {
783        let event = normalize_inbound_event(&json!({
784            "type": "message.created",
785            "conversation_id": "dm_2",
786            "message": {
787                "id": "msg_3",
788                "agent_name": "lead",
789                "text": "direct"
790            }
791        }))
792        .expect("conversation message should normalize");
793
794        assert_eq!(event.kind, NormalizedEventKind::DmReceived);
795        assert_eq!(event.target, "dm_2");
796    }
797
798    #[test]
799    fn normalizes_reaction_with_channel_context() {
800        let event = normalize_inbound_event(&json!({
801            "type": "message.reacted",
802            "message_id": "msg_4",
803            "agent_name": "alice",
804            "emoji": "eyes",
805            "action": "added",
806            "channel_name": "general"
807        }))
808        .expect("reaction should normalize");
809
810        assert_eq!(event.kind, NormalizedEventKind::ReactionReceived);
811        assert_eq!(event.target, "#general");
812        assert_eq!(event.priority, RelayPriority::P4);
813    }
814
815    #[test]
816    fn normalizes_command_invocation() {
817        let command = normalize_command_invocation(&json!({
818            "type": "command.invoked",
819            "command": "/spawn",
820            "channel": "general",
821            "invoked_by": "lead",
822            "handler": { "id": "agent_handler" },
823            "args": "worker-1",
824            "parameters": { "name": "worker-1", "cli": "codex" }
825        }))
826        .expect("command should normalize");
827
828        assert_eq!(command.command, "/spawn");
829        assert_eq!(command.handler_agent_id.as_deref(), Some("agent_handler"));
830        assert_eq!(
831            command
832                .parameters
833                .as_ref()
834                .and_then(|params| params.get("cli"))
835                .and_then(|value| value.as_str()),
836            Some("codex")
837        );
838    }
839
840    #[test]
841    fn extracts_sender_kind_from_nested_object() {
842        let event = normalize_inbound_event(&json!({
843            "type": "dm.received",
844            "conversation_id": "dm_3",
845            "message": {
846                "id": "msg_5",
847                "from": { "name": "Will", "type": "human" },
848                "text": "hello"
849            }
850        }))
851        .expect("nested sender should normalize");
852
853        assert_eq!(event.from, "Will");
854        assert_eq!(event.sender_kind, SenderKind::Human);
855    }
856
857    #[test]
858    fn rejects_malformed_message_events() {
859        assert!(normalize_inbound_event(&json!({
860            "type": "message.created",
861            "channel": "general"
862        }))
863        .is_none());
864    }
865
866    #[test]
867    fn normalizes_infrastructure_sender_identity() {
868        assert_eq!(normalize_sender_identity("broker"), "Dashboard");
869        assert_eq!(normalize_sender_identity("broker-abc123"), "Dashboard");
870        assert_eq!(normalize_sender_identity("human:orchestrator"), "Dashboard");
871        assert_eq!(normalize_sender_identity("alice"), "alice");
872    }
873}