car-server-core 0.30.0

Transport-neutral library for the CAR daemon JSON-RPC dispatcher (used by car-server and tokhn-daemon)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
//! Slack approval-transport adapter (Unit 4) — the maximally-different SECOND
//! channel that proves the channel-agnostic seam holds.
//!
//! Where the iMessage adapter is poll-based with a text-code grammar and a
//! per-approval `CodeMap`, the Slack adapter is **push-based** (Socket Mode
//! WebSocket) with **Block Kit buttons** that carry the `approval_id`
//! DIRECTLY — there is NO `CodeMap` and NO text-code grammar on the Slack
//! side. The two channels share ONLY the channel-agnostic [`ApprovalCore`]
//! (resolve by `approval_id`) and the per-channel
//! [`MessagingConfigStore`] (allowlist + pairing-code primitive). Neither
//! fakes the other's model.
//!
//! ## Mockable transport seam (MC-12 / MC-13)
//!
//! The Slack outbound (`chat.postMessage`) and inbound (Socket Mode event
//! source) sit behind [`SlackTransport`]. The production impl
//! ([`RealSlackTransport`]) drives `tokio-tungstenite` + `reqwest`; tests
//! substitute a mock (see `tests/slack_mock.rs`) so the e2e + pairing round-
//! trips run with NO live Slack — exactly the `SpySender`/`MessageSender`
//! pattern the iMessage gates use.
//!
//! ## The two inbound shapes (and ONLY these two)
//!
//! Slack inbound recognizes EXACTLY TWO message shapes and ignores everything
//! else (MC-6 anti-injection by construction — the parse output is a closed
//! set with NO config-mutation arm):
//!
//! 1. **Approve/Deny button interaction** (`block_actions`) — `action_id`
//!    names the verb, `value` carries the `approval_id`. Routes to
//!    [`ApprovalCore::resolve`] with the Slack transport principal.
//! 2. **Pairing-code DM** (`message.im`) — `user` is the member id, `text` is
//!    the code. Routes to
//!    [`MessagingConfigStore::validate_and_consume_pairing_code_for`] — the
//!    ONLY allowlist-bind edge reachable from a Slack inbound event (MC-13:
//!    code-proven pairing). A wrong code binds nothing; a bare member id with
//!    no code binds nothing.
//!
//! ## Token in keychain, never plaintext (MC-9)
//!
//! The bot token (`xoxb-`) and app-level token (`xapp-`) are stored via
//! [`car_secrets::SecretStore::put`] (OS keychain). The Slack
//! [`crate::channel::ChannelConfig`] on disk holds only a token *reference*
//! (a key name resolved against [`car_secrets::DEFAULT_SERVICE`]), never the
//! bearer value — so no `xoxb`/`xapp` literal ever lands in `messaging.json`.
//! The outbound transport fetches the bot token via
//! [`car_secrets::SecretStore::get`] at use-time.

use async_trait::async_trait;

use crate::approval_core::ApprovalCore;
use crate::channel::{CancelSignal, ChannelId, InboundChannel, InboundSink};
use crate::messaging_config::{MessagingConfigStore, PairingOutcome};

/// The system-raised principal the Slack adapter supplies to
/// [`ApprovalCore::resolve`]. The literal lives HERE (in the Slack adapter),
/// passed INTO the channel-agnostic core — it must NOT appear in
/// `approval_core.rs` (MC-3 edge), mirroring the iMessage adapter's
/// `imessage-transport` principal.
pub const SLACK_PRINCIPAL: &str = "slack-transport";

/// The `action_id` on the Approve button (Block Kit). A click delivers this
/// in `payload.actions[0].action_id`.
pub const APPROVE_ACTION_ID: &str = "approve_request";
/// The `action_id` on the Deny button.
pub const DENY_ACTION_ID: &str = "deny_request";

/// The resolution verb the Slack transport supplies on an approve — matches
/// the gate's `approve_label` so an inbound approve reads exactly like a
/// CarHost click (same as the iMessage adapter's `APPROVE`).
const APPROVE: &str = "approve";
/// The resolution verb on a deny.
const DENY: &str = "deny";

// ===================================================================
// Keychain token reference (MC-9)
// ===================================================================

/// The keychain key (under [`car_secrets::DEFAULT_SERVICE`]) the Slack bot
/// token (`xoxb-`) is stored at. The on-disk Slack config holds this *name*,
/// never the bearer value.
pub const SLACK_BOT_TOKEN_KEY: &str = "SLACK_BOT_TOKEN";
/// The keychain key the Slack app-level token (`xapp-`) is stored at.
pub const SLACK_APP_TOKEN_KEY: &str = "SLACK_APP_TOKEN";

/// Provision the Slack tokens into the OS keychain (MC-9). The bot token and
/// app-level token are written via [`car_secrets::SecretStore::put`] — the
/// daemon never persists them into `messaging.json`. This is the host-gated
/// provisioning write path (called from the host/local-auth-gated config
/// surface, NEVER from an inbound message).
///
/// Returns the keychain key NAMES the Slack config should reference (a ref,
/// not the bearer value).
pub fn provision_slack_tokens(
    store: &car_secrets::SecretStore,
    bot_token: &str,
    app_token: &str,
) -> Result<SlackTokenRefs, String> {
    use car_secrets::{SecretRef, DEFAULT_SERVICE};
    store
        .put(&SecretRef::new(DEFAULT_SERVICE, SLACK_BOT_TOKEN_KEY), bot_token)
        .map_err(|e| format!("store bot token: {e}"))?;
    store
        .put(&SecretRef::new(DEFAULT_SERVICE, SLACK_APP_TOKEN_KEY), app_token)
        .map_err(|e| format!("store app token: {e}"))?;
    Ok(SlackTokenRefs {
        bot_token_key: SLACK_BOT_TOKEN_KEY.to_string(),
        app_token_key: SLACK_APP_TOKEN_KEY.to_string(),
    })
}

/// Keychain key REFERENCES for the Slack tokens — what the on-disk config
/// carries instead of the bearer values (MC-9). The actual `xoxb-`/`xapp-`
/// strings live only in the OS keychain.
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct SlackTokenRefs {
    /// Keychain key for the bot token (`xoxb-`).
    pub bot_token_key: String,
    /// Keychain key for the app-level token (`xapp-`).
    pub app_token_key: String,
}

/// Fetch a secret from the OS keychain by its reference key (MC-9 read side).
/// Generic get-by-ref under [`car_secrets::DEFAULT_SERVICE`] — used for the bot
/// token AND the app-level token at use-time, so a secret is never read out of
/// `messaging.json` as a value.
pub fn fetch_secret_by_ref(
    store: &car_secrets::SecretStore,
    key: &str,
) -> Result<String, String> {
    use car_secrets::{SecretRef, DEFAULT_SERVICE};
    store
        .get(&SecretRef::new(DEFAULT_SERVICE, key))
        .map_err(|e| format!("fetch secret {key}: {e}"))
}

// ===================================================================
// The mockable transport seam (MC-12 / MC-13)
// ===================================================================

/// One inbound Slack event the adapter acts on. **Closed set** — the
/// transport yields exactly one of these (or `Ignore`), so there is NO
/// inbound→config-mutation edge by construction (MC-6). A config-mutation-
/// shaped Slack message has no variant here and falls through to `Ignore`.
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum SlackInboundEvent {
    /// An Approve/Deny button click. `action_id` is the verb, `value` carries
    /// the `approval_id` directly (NO CodeMap). `user` is the clicker.
    ButtonInteraction {
        /// `payload.actions[0].action_id` — [`APPROVE_ACTION_ID`] /
        /// [`DENY_ACTION_ID`].
        action_id: String,
        /// `payload.actions[0].value` — the `approval_id` (no grammar).
        value: String,
        /// `payload.user.id` — the Slack member who clicked.
        user: String,
    },
    /// A DM to the bot (`message.im`). `user` is the member id, `text` is the
    /// (possibly-pairing) code. The ONLY shape that can bind a handle — and
    /// only via the constant-time pairing-code match.
    PairingDm {
        /// `event.user` — the member id to bind on a code match.
        user: String,
        /// `event.text` — the candidate pairing code.
        text: String,
    },
    /// Anything else (the bot's own echo, channel chatter, an unknown
    /// interaction). Ignored — never a config mutation.
    Ignore,
}

/// The Slack transport seam (MC-12 / MC-13). The production impl drives a
/// Socket Mode WebSocket + the Web API; tests substitute a mock that captures
/// outbound posts and injects inbound events.
///
/// Object-safe (`#[async_trait]`) so the adapter holds `Arc<dyn
/// SlackTransport>` and the mock and real impls are interchangeable.
#[async_trait]
pub trait SlackTransport: Send + Sync {
    /// Post a Block Kit approval message carrying Approve/Deny buttons whose
    /// `value` is the `approval_id`. `text` is the human-readable summary (also
    /// carries the shared fan-out code for MC-8 equality). Returns the posted
    /// message `ts` on success.
    async fn post_message(
        &self,
        channel: &str,
        text: &str,
        approval_id: &str,
    ) -> Result<String, String>;

    /// Block until the next inbound Slack event (button click or DM), or
    /// `None` when the source is exhausted / cancelled. The adapter ACKs and
    /// dispatches each returned event. The transport is responsible for the
    /// Socket Mode ACK contract and for suppressing the bot's own echo
    /// (returning [`SlackInboundEvent::Ignore`] or skipping it).
    async fn next_event(&self) -> Option<SlackInboundEvent>;
}

/// Build the Block Kit body the outbound post renders. Pure function so the
/// mock and the real transport agree on the wire shape, and tests can assert
/// the buttons carry the `approval_id` (MC-12). The `value` on BOTH buttons is
/// the raw `approval_id` — NO code grammar.
pub fn build_block_kit_message(text: &str, approval_id: &str) -> serde_json::Value {
    serde_json::json!({
        "text": text,
        "blocks": [
            {
                "type": "section",
                "text": { "type": "mrkdwn", "text": text }
            },
            {
                "type": "actions",
                "block_id": "approval_actions",
                "elements": [
                    {
                        "type": "button",
                        "action_id": APPROVE_ACTION_ID,
                        "text": { "type": "plain_text", "text": "Approve", "emoji": true },
                        "style": "primary",
                        "value": approval_id
                    },
                    {
                        "type": "button",
                        "action_id": DENY_ACTION_ID,
                        "text": { "type": "plain_text", "text": "Deny", "emoji": true },
                        "style": "danger",
                        "value": approval_id
                    }
                ]
            }
        ]
    })
}

// ===================================================================
// The Slack adapter
// ===================================================================

/// In-process Slack adapter. UNCONDITIONAL (no `#[cfg(target_os=...)]` — MC-11):
/// Slack is cross-platform, the whole point of the second channel proving the
/// seam holds without macOS. Holds the channel-agnostic [`ApprovalCore`]
/// (resolve by `approval_id`), the per-channel [`MessagingConfigStore`]
/// (allowlist + pairing), the [`SlackTransport`] seam, and the configured
/// Slack channel id to post into.
pub struct SlackAdapter {
    /// Channel-agnostic approval semantics. The `"slack-transport"` principal
    /// is supplied by this adapter at each `core.resolve(...)` call — it is not
    /// baked into the core (MC-3).
    core: ApprovalCore,
    /// Per-channel config + pairing store (the Slack section).
    config: MessagingConfigStore,
    /// The mockable transport seam (Socket Mode + Web API in prod; a mock in
    /// tests).
    transport: std::sync::Arc<dyn SlackTransport>,
    /// The Slack channel/DM id the outbound prompt posts into.
    channel: String,
}

impl SlackAdapter {
    /// Build a Slack adapter over the shared host, config store, transport
    /// seam, and the configured Slack channel id.
    pub fn new(
        host: std::sync::Arc<crate::host::HostState>,
        config: MessagingConfigStore,
        transport: std::sync::Arc<dyn SlackTransport>,
        channel: impl Into<String>,
    ) -> Self {
        Self {
            core: ApprovalCore::new(host),
            config,
            transport,
            channel: channel.into(),
        }
    }

    /// Post ONE Block Kit approval prompt for `approval_id` (carrying the
    /// shared fan-out `code` in the text for MC-8 equality) to the configured
    /// Slack channel. The button `value` carries the `approval_id` directly.
    /// Used by the fan-out outbound path (Unit 5) and by the e2e test.
    ///
    /// Gating: a disabled Slack channel is a silent no-op (zero posts) — the
    /// enabled-flag wall, same as iMessage.
    pub async fn post_prompt(&self, action: &str, approval_id: &str, code: &str) {
        if !self.config.is_enabled_for(ChannelId::Slack).unwrap_or(false) {
            return;
        }
        // No post-channel configured ⇒ posting to `channel:""` is a guaranteed
        // `channel_not_found` every tick (plus a keychain token fetch per doomed
        // tick + log spam). Early-return instead. The single missing-channel
        // warning is logged once at boot (see the orchestrator's Slack adapter
        // build); here we just no-op.
        if self.channel.is_empty() {
            return;
        }
        let text = slack_prompt_text(action, code);
        if let Err(e) = self
            .transport
            .post_message(&self.channel, &text, approval_id)
            .await
        {
            tracing::warn!(approval_id = %approval_id, error = %e, "slack approval prompt post failed");
        }
    }

    /// Dispatch one inbound Slack event. The closed set guarantees the only
    /// effects are: (1) resolve a known approval by id (button), or (2)
    /// validate-and-consume a pairing code (DM). NO config-mutation edge
    /// (MC-6). The enabled-flag wall applies — a disabled channel does zero
    /// inbound work.
    pub async fn handle_event(&self, event: &SlackInboundEvent) {
        if !self.config.is_enabled_for(ChannelId::Slack).unwrap_or(false) {
            return;
        }
        match event {
            SlackInboundEvent::ButtonInteraction {
                action_id,
                value,
                user,
            } => {
                self.handle_button(action_id, value, user).await;
            }
            SlackInboundEvent::PairingDm { user, text } => {
                // The ONLY inbound-reachable allowlist bind (MC-13): a
                // constant-time match of the locally-minted Slack code binds
                // this member id. A wrong code (or no active code) binds
                // nothing; a bare member id with no code never reaches a setter
                // (it parses to `Ignore` upstream / fails the pairing match).
                let _outcome: PairingOutcome = self
                    .config
                    .validate_and_consume_pairing_code_for(ChannelId::Slack, user, text)
                    .unwrap_or(PairingOutcome::Rejected);
            }
            SlackInboundEvent::Ignore => {}
        }
    }

    /// Resolve the approval named by a button's `value` (the `approval_id`),
    /// clicked by `user` (the Slack member id). Approve vs Deny is read from the
    /// `action_id`. A bogus / already-resolved `approval_id` no-ops via the
    /// untouched first-writer-wins guard — no panic, nothing resolves, no second
    /// event (MC-12 edge).
    ///
    /// BEFORE resolving, the two trust gates the iMessage inbound path enforces
    /// are applied here too (the Slack button path must NOT be a back door
    /// around pairing + eligibility):
    ///
    /// 1. **Allowlist (SC-7 wall):** the clicker must be on the Slack channel's
    ///    allowlist (the paired approver). A non-allowlisted member of the
    ///    configured channel clicking a button is a silent no-op — no resolve,
    ///    no event. The allowlist is the whole point of pairing; without this
    ///    check it is inert on Slack (any channel member could resolve).
    /// 2. **Eligibility (MC-7):** the named `approval_id` must be a pending,
    ///    eligible system-level row ([`ApprovalCore::is_id_eligible_pending`]).
    ///    This excludes `ws.method:`-prefixed blocking-gate rows (high-risk
    ///    methods the user never acked) and session-owned (`client_id: Some`)
    ///    rows — exactly like the iMessage path. A crafted `block_actions`
    ///    payload whose `value` names such a row resolves NOTHING.
    async fn handle_button(&self, action_id: &str, approval_id: &str, user: &str) {
        let resolution = if action_id == APPROVE_ACTION_ID {
            APPROVE
        } else if action_id == DENY_ACTION_ID {
            DENY
        } else {
            // Unknown action_id — not one of our two buttons. Ignore.
            return;
        };
        // Gate 1 — allowlist (SC-7). Drop a non-allowlisted clicker BEFORE any
        // resolve. `is_allowlisted_for` errs only on a malformed config file;
        // treat an error as "not allowlisted" (fail closed).
        if !self
            .config
            .is_allowlisted_for(ChannelId::Slack, user)
            .unwrap_or(false)
        {
            return;
        }
        // Gate 2 — eligibility (MC-7). Only a pending, eligible system-level row
        // is resolvable from a channel transport. A `ws.method:` blocking-gate
        // row or a session-owned (`client_id: Some`) row is NOT eligible, so a
        // Slack button can never resolve one.
        if !self.core.is_id_eligible_pending(approval_id).await {
            return;
        }
        // Past both gates: resolve by approval_id directly through the
        // channel-agnostic core. The core's first-writer-wins guard makes an
        // already-resolved id a safe no-op (returns Error/StillPending, emits
        // nothing). `SLACK_PRINCIPAL` is the audit principal.
        let _ = self
            .core
            .resolve(SLACK_PRINCIPAL, approval_id, resolution)
            .await;
    }

    /// The shared core (so the fan-out coordinator can query eligibility
    /// through the same `Arc<HostState>`).
    pub fn core(&self) -> &ApprovalCore {
        &self.core
    }
}

/// The Slack adapter implements the channel-agnostic [`InboundChannel`] seam:
/// it names [`ChannelId::Slack`] and OWNS its push-based inbound loop, pulling
/// events off the [`SlackTransport`] and dispatching each. UNCONDITIONAL — no
/// cfg gate (MC-11).
///
/// Note on the `sink`: like the iMessage adapter, the Slack adapter is its own
/// delivery path. Its inbound is button-clicks (resolve by `approval_id`) and
/// pairing DMs (the code primitive) — neither maps to the `handle_id`+`body`
/// `InboundMessage` sink shape, so the passed sink is unused here (the seam's
/// `sink` param exists for a hypothetical adapter whose delivery is externally
/// supplied; Slack, like iMessage, delivers internally and honestly does not
/// fake an `InboundMessage`).
#[async_trait]
impl InboundChannel for SlackAdapter {
    fn channel(&self) -> ChannelId {
        ChannelId::Slack
    }

    async fn run(&self, _sink: &dyn InboundSink, mut cancel: CancelSignal) {
        loop {
            // Stop promptly on cancel without waiting for the next event.
            tokio::select! {
                _ = cancel.changed() => {
                    if *cancel.borrow() {
                        break;
                    }
                }
                event = self.transport.next_event() => {
                    match event {
                        Some(ev) => self.handle_event(&ev).await,
                        None => break, // source exhausted / closed
                    }
                }
            }
        }
    }
}

/// Build the Slack prompt text: the action summary plus the shared fan-out
/// code (MC-8: the SAME code that iMessage's text grammar carries, so both
/// sinks render one shared code minted once). The Slack APPROVE/DENY mechanism
/// is the buttons (value = approval_id); the code in the text is for fan-out
/// equality + human readability, not a resolve grammar.
pub fn slack_prompt_text(action: &str, code: &str) -> String {
    format!("*Approval needed:* {action}\n(ref `{code}`) — use the buttons below.")
}

// ===================================================================
// Production transport: Socket Mode (tokio-tungstenite) + Web API (reqwest)
// ===================================================================

/// Production [`SlackTransport`] — UNCONDITIONAL (no cfg gate, MC-11). Drives:
///
/// - **Outbound** `chat.postMessage` via `reqwest`, fetching the bot token
///   (`xoxb-`) from the OS keychain by its reference key at use-time (MC-9 — the
///   token is never read out of `messaging.json`).
/// - **Inbound** Socket Mode: opens a WebSocket via `apps.connections.open`
///   (using the app-level token `xapp-`, also keychain-fetched), reads frames,
///   ACKs each `events_api`/`interactive` envelope within the ~3s window, and
///   maps the two recognized shapes to [`SlackInboundEvent`]. The bot's own
///   echo is suppressed (`subtype == "bot_message"` / `bot_id` present).
///
/// The reconnect loop (resilient backoff, `apps.connections.open` per
/// connection, `disconnect`/`warning` handling) follows the Socket Mode
/// reference. `next_event` pulls from an internal channel the background
/// connection task feeds.
pub struct RealSlackTransport {
    secrets: car_secrets::SecretStore,
    bot_token_key: String,
    app_token_key: String,
    http: reqwest::Client,
    /// Inbound events the background Socket Mode task feeds; `next_event`
    /// receives off this.
    inbound_rx: tokio::sync::Mutex<tokio::sync::mpsc::Receiver<SlackInboundEvent>>,
    inbound_tx: tokio::sync::mpsc::Sender<SlackInboundEvent>,
}

impl RealSlackTransport {
    /// Build the production transport. `bot_token_key`/`app_token_key` are the
    /// keychain REFERENCE keys (from [`SlackTokenRefs`]) — the bearer values
    /// live only in the OS keychain (MC-9).
    pub fn new(bot_token_key: impl Into<String>, app_token_key: impl Into<String>) -> Self {
        let (inbound_tx, inbound_rx) = tokio::sync::mpsc::channel(64);
        Self {
            secrets: car_secrets::SecretStore::new(),
            bot_token_key: bot_token_key.into(),
            app_token_key: app_token_key.into(),
            http: reqwest::Client::new(),
            inbound_rx: tokio::sync::Mutex::new(inbound_rx),
            inbound_tx,
        }
    }

    /// Open a Socket Mode WebSocket URL via `apps.connections.open` (uses the
    /// app-level token, keychain-fetched at use-time).
    async fn open_socket_url(&self) -> Result<String, String> {
        let app_token = fetch_secret_by_ref(&self.secrets, &self.app_token_key)?;
        let resp: serde_json::Value = self
            .http
            .post("https://slack.com/api/apps.connections.open")
            .bearer_auth(app_token)
            .header("Content-Length", "0")
            .send()
            .await
            .map_err(|e| format!("apps.connections.open: {e}"))?
            .json()
            .await
            .map_err(|e| format!("apps.connections.open decode: {e}"))?;
        parse_socket_url_response(&resp)
    }

    /// Spawn the background Socket Mode reconnect loop. Each connection reads
    /// frames, ACKs, maps the two recognized shapes, and forwards them to
    /// `inbound_tx`. Runs until `cancel` flips. Per the reference: resilient
    /// backoff, one `apps.connections.open` per connection, `disconnect`/error
    /// → reconnect.
    pub fn spawn_socket_loop(self: std::sync::Arc<Self>, mut cancel: CancelSignal) {
        tokio::spawn(async move {
            // A connection that stayed up at least this long is treated as
            // "healthy" — only then is the backoff reset to 1s. Resetting on
            // EVERY `Ok(url)` (the prior behavior) meant a connection that
            // failed instantly still cleared the backoff, so a persistently
            // broken endpoint produced a tight 1s open→fail→open loop instead of
            // escalating. The threshold sits comfortably above the Socket Mode
            // ACK/hello round-trip.
            const HEALTHY_UP: std::time::Duration = std::time::Duration::from_secs(5);
            let mut backoff = std::time::Duration::from_secs(1);
            loop {
                if *cancel.borrow() {
                    break;
                }
                match self.open_socket_url().await {
                    Ok(url) => {
                        let started = std::time::Instant::now();
                        if let Err(e) = self.run_one_connection(&url, &mut cancel).await {
                            tracing::warn!(error = %e, "slack socket connection ended");
                        }
                        if started.elapsed() >= HEALTHY_UP {
                            // The connection stayed up — reset to the base delay.
                            backoff = std::time::Duration::from_secs(1);
                        } else {
                            // It fell over fast — back off + sleep before the
                            // next open so we don't spin.
                            tokio::select! {
                                _ = tokio::time::sleep(backoff) => {}
                                _ = cancel.changed() => {}
                            }
                            backoff = (backoff * 2).min(std::time::Duration::from_secs(30));
                        }
                    }
                    Err(e) => {
                        tracing::warn!(error = %e, "slack apps.connections.open failed");
                        tokio::select! {
                            _ = tokio::time::sleep(backoff) => {}
                            _ = cancel.changed() => {}
                        }
                        backoff = (backoff * 2).min(std::time::Duration::from_secs(30));
                    }
                }
            }
        });
    }

    /// Run a single Socket Mode WebSocket connection: ACK every
    /// `events_api`/`interactive` envelope, map the two recognized shapes, and
    /// forward to the inbound channel. Returns on disconnect/close/error/cancel.
    async fn run_one_connection(
        &self,
        wss_url: &str,
        cancel: &mut CancelSignal,
    ) -> Result<(), String> {
        use futures_util::{SinkExt, StreamExt};
        use tokio_tungstenite::{connect_async, tungstenite::Message};

        let (ws_stream, _resp) = connect_async(wss_url)
            .await
            .map_err(|e| format!("ws connect: {e}"))?;
        let (mut write, mut read) = ws_stream.split();

        loop {
            tokio::select! {
                _ = cancel.changed() => {
                    if *cancel.borrow() { return Ok(()); }
                }
                frame = read.next() => {
                    let Some(frame) = frame else { return Ok(()); };
                    let msg = frame.map_err(|e| format!("ws read: {e}"))?;
                    let text = match msg {
                        Message::Text(t) => t,
                        Message::Close(_) => return Ok(()),
                        _ => continue,
                    };
                    let envelope: serde_json::Value = match serde_json::from_str(&text) {
                        Ok(v) => v,
                        Err(_) => continue,
                    };
                    let ev_type = envelope["type"].as_str().unwrap_or("");
                    // ACK within ~3s — before any business logic. The frame
                    // shape (`{envelope_id}`) is built by the shared pure
                    // `build_ack_frame` the wire-parse gate asserts.
                    if matches!(ev_type, "events_api" | "interactive" | "slash_commands") {
                        if let Some(ack) = build_ack_frame(&envelope) {
                            let _ = write.send(Message::Text(ack.to_string().into())).await;
                        }
                    }
                    if ev_type == "disconnect" {
                        return Ok(()); // reconnect in the outer loop
                    }
                    // Route through the SINGLE shared wire parser. Only the two
                    // closed shapes are forwarded; `Ignore` (bot echo, non-im,
                    // unknown action, hello) is dropped here (MC-6 boundary).
                    match parse_socket_frame(&envelope) {
                        ev @ (SlackInboundEvent::ButtonInteraction { .. }
                        | SlackInboundEvent::PairingDm { .. }) => {
                            let _ = self.inbound_tx.send(ev).await;
                        }
                        SlackInboundEvent::Ignore => {}
                    }
                }
            }
        }
    }
}

/// Resolve the WebSocket URL out of an `apps.connections.open` response body.
/// Pure function so the reconnect-spin guard is unit-testable without a live
/// HTTP call: an `ok:false` body, or an `ok:true` body whose `url` is
/// absent/empty, both yield `Err` (NOT `Ok("")`). Returning `Ok("")` would
/// route through the success arm of [`RealSlackTransport::spawn_socket_loop`]
/// (resetting backoff), then `connect_async("")` fails instantly — a tight,
/// no-sleep open→fail→open spin. Routing it as `Err` sends it through the
/// backoff/sleep arm instead.
pub fn parse_socket_url_response(resp: &serde_json::Value) -> Result<String, String> {
    if resp["ok"].as_bool() != Some(true) {
        return Err(format!(
            "apps.connections.open failed: {}",
            resp["error"].as_str().unwrap_or("unknown")
        ));
    }
    resp["url"]
        .as_str()
        .filter(|u| !u.is_empty())
        .map(|u| u.to_string())
        .ok_or_else(|| "apps.connections.open returned ok with no url".to_string())
}

/// Build the Socket Mode ACK frame for an envelope. Pure function so the real
/// transport and the wire-parse tests agree on the shape: the 3s-window ACK
/// echoes ONLY the `envelope_id` back as the frame body (the minimal,
/// no-response-payload ACK). Returns `None` when the envelope has no
/// `envelope_id` (a frame that needs no ACK, e.g. `hello`/`disconnect`).
pub fn build_ack_frame(envelope: &serde_json::Value) -> Option<serde_json::Value> {
    let eid = envelope["envelope_id"].as_str()?;
    Some(serde_json::json!({ "envelope_id": eid }))
}

/// Parse a top-level Socket Mode envelope (the WS text frame's JSON) into a
/// [`SlackInboundEvent`]. This is the SINGLE wire-parse entry point the real
/// Socket Mode loop and the tests share: it routes by the envelope `type`
/// (`interactive` → block_actions; `events_api` → message.im) and yields
/// [`SlackInboundEvent::Ignore`] for everything else (the bot's own echo, a
/// non-DM channel message, an unknown action, a `hello`/`disconnect` frame).
///
/// The closed-set output is the MC-6 anti-injection boundary BY CONSTRUCTION:
/// a hostile envelope — even one whose DM body is literally a config-mutation
/// payload — can only ever produce a `ButtonInteraction`, a `PairingDm`, or
/// `Ignore`. There is NO arm that manufactures a config mutation.
pub fn parse_socket_frame(envelope: &serde_json::Value) -> SlackInboundEvent {
    match envelope["type"].as_str() {
        Some("interactive") => {
            parse_interactive(&envelope["payload"]).unwrap_or(SlackInboundEvent::Ignore)
        }
        Some("events_api") => {
            parse_events_api(&envelope["payload"]).unwrap_or(SlackInboundEvent::Ignore)
        }
        _ => SlackInboundEvent::Ignore,
    }
}

/// Parse a Socket Mode `interactive` (block_actions) payload into a
/// [`SlackInboundEvent::ButtonInteraction`], or `None` if it is not one of our
/// two buttons. `pub` so the wire-parse gate (`mc_slack_wire_parse`) drives the
/// REAL parser, not a pre-built enum — the MC-6 boundary lives in this code.
pub fn parse_interactive(payload: &serde_json::Value) -> Option<SlackInboundEvent> {
    if payload["type"].as_str()? != "block_actions" {
        return None;
    }
    let action = payload["actions"].as_array()?.first()?;
    let action_id = action["action_id"].as_str()?.to_string();
    if action_id != APPROVE_ACTION_ID && action_id != DENY_ACTION_ID {
        return None;
    }
    let value = action["value"].as_str()?.to_string();
    let user = payload["user"]["id"].as_str().unwrap_or("").to_string();
    Some(SlackInboundEvent::ButtonInteraction {
        action_id,
        value,
        user,
    })
}

/// Parse a Socket Mode `events_api` payload into a
/// [`SlackInboundEvent::PairingDm`], or `None` if it is not a member DM (the
/// bot's own echo is suppressed: `subtype == "bot_message"` or `bot_id`
/// present). `pub` so the wire-parse + MC-6 gates exercise the REAL parser:
/// a non-`im` channel message or a bot echo yields `None` ⇒ `Ignore`.
pub fn parse_events_api(payload: &serde_json::Value) -> Option<SlackInboundEvent> {
    let event = &payload["event"];
    if event["type"].as_str()? != "message" {
        return None;
    }
    if event["channel_type"].as_str() != Some("im") {
        return None; // only DMs to the bot
    }
    // Suppress the bot's own echo.
    if event["subtype"].as_str() == Some("bot_message") || event.get("bot_id").is_some() {
        return None;
    }
    let user = event["user"].as_str()?.to_string();
    let text = event["text"].as_str().unwrap_or("").trim().to_string();
    if user.is_empty() || text.is_empty() {
        return None;
    }
    Some(SlackInboundEvent::PairingDm { user, text })
}

#[async_trait]
impl SlackTransport for RealSlackTransport {
    async fn post_message(
        &self,
        channel: &str,
        text: &str,
        approval_id: &str,
    ) -> Result<String, String> {
        // MC-9: fetch the bot token from the keychain by ref at use-time.
        let bot_token = fetch_secret_by_ref(&self.secrets, &self.bot_token_key)?;
        let mut body = build_block_kit_message(text, approval_id);
        body["channel"] = serde_json::Value::String(channel.to_string());
        let resp: serde_json::Value = self
            .http
            .post("https://slack.com/api/chat.postMessage")
            .bearer_auth(bot_token)
            .json(&body)
            .send()
            .await
            .map_err(|e| format!("chat.postMessage: {e}"))?
            .json()
            .await
            .map_err(|e| format!("chat.postMessage decode: {e}"))?;
        if resp["ok"].as_bool() != Some(true) {
            return Err(format!(
                "chat.postMessage failed: {}",
                resp["error"].as_str().unwrap_or("unknown")
            ));
        }
        Ok(resp["ts"].as_str().unwrap_or("").to_string())
    }

    async fn next_event(&self) -> Option<SlackInboundEvent> {
        self.inbound_rx.lock().await.recv().await
    }
}