Skip to main content

polyc_rpc_client/
lib.rs

1//! Thin connectrpc client over the generated
2//! [`polyc_proto::proto::polychrome::agent::v1::AgentServiceClient`].
3//!
4//! Both the operator CLI (`polychrome send`) and the reference-edge receiver dial the
5//! external-facing `AgentService` with the same Connect-streaming dance: build
6//! a connect-rust client, send one [`AgentRequest`], drain the
7//! [`AgentResponse`](polyc_proto::proto::polychrome::agent::v1::AgentResponse)
8//! stream, and render each non-empty content block to user-visible text. This
9//! crate is the single home for that logic so the two call sites can't drift.
10//!
11//! RPC-client concerns deliberately live here rather than in
12//! `polyc-agent` (the turn-loop crate, the wrong layer for a transport
13//! client).
14//!
15//! For v1 the whole turn is collected into a single string and returned at
16//! end-of-turn; incremental streaming (e.g. Slack `chat.appendStream`) is a
17//! follow-up that can build on the same client.
18
19#![forbid(unsafe_code)]
20#![warn(missing_docs)]
21
22pub mod edge;
23pub use edge::{EdgeAdapter, build_attribution};
24
25use std::sync::Arc;
26
27use connectrpc::client::{ClientConfig, HttpClient};
28use futures::Stream;
29use polyc_agent::text_message;
30use polyc_proto::proto::polychrome::agent::v1::{
31    AgentEnd, AgentRequest, AgentServiceClient, AgentStart, ClassifyRequest, Message,
32    ParticipantMessage, Verdict, agent_response, content, thought_summary_content,
33    tool_call_content,
34};
35use polyc_proto::proto::polychrome::approval::v1::{
36    ApprovalResponseRequest, ApprovalServiceClient,
37};
38use polyc_proto::proto::polychrome::persona::v1::{
39    CompleteLinkRequest, DescribeRequest, LinkOutcome, PersonaServiceClient, StartDeepLinkRequest,
40    StartLinkRequest,
41};
42
43/// Errors an agent dial can produce.
44#[derive(Debug, thiserror::Error)]
45pub enum DialError {
46    /// Could not parse the configured agent address as a URI.
47    #[error("invalid agent address {addr:?}: {source}")]
48    InvalidAddress {
49        /// The address string that failed to parse.
50        addr: String,
51        /// The underlying URI parse error.
52        #[source]
53        source: http::uri::InvalidUri,
54    },
55    /// Building the TLS client for an `https://` endpoint failed (e.g. no
56    /// process-default crypto provider). The dial fails closed rather than
57    /// silently downgrading to plaintext.
58    #[error("tls setup failed for agent address: {0}")]
59    Tls(String),
60    /// Connect-level error from the `AgentService` stream.
61    #[error(transparent)]
62    Connect(#[from] connectrpc::ConnectError),
63}
64
65impl DialError {
66    /// The Connect error code, when this is a transport-level Connect error
67    /// (`None` for local address/TLS-setup failures).
68    #[must_use]
69    pub const fn code(&self) -> Option<connectrpc::ErrorCode> {
70        match self {
71            Self::Connect(e) => Some(e.code),
72            Self::InvalidAddress { .. } | Self::Tls(_) => None,
73        }
74    }
75
76    /// Whether retrying the call could plausibly succeed. Only transient
77    /// transport conditions are retryable; terminal codes
78    /// (`InvalidArgument`, `Unauthenticated`, `NotFound`, …) and local
79    /// address/TLS failures are not. Edges use this to decide whether to ask
80    /// the source platform to redeliver (retryable) or to drop / 4xx
81    /// (terminal — redelivery would loop forever).
82    #[must_use]
83    pub const fn is_retryable(&self) -> bool {
84        matches!(
85            self.code(),
86            Some(
87                connectrpc::ErrorCode::Unavailable
88                    | connectrpc::ErrorCode::DeadlineExceeded
89                    | connectrpc::ErrorCode::ResourceExhausted
90                    | connectrpc::ErrorCode::Aborted
91            )
92        )
93    }
94}
95
96/// Default per-call deadline for an agent turn (emitted as `Connect-Timeout-Ms`
97/// on every dial). Turns can be long (tool loops, slow providers), so this is
98/// generous; edges add their own outer `tokio::time::timeout` for defence.
99const AGENT_DIAL_TIMEOUT: std::time::Duration = std::time::Duration::from_mins(3);
100
101/// Default per-call deadline for the short control-plane RPCs (approval
102/// responses, persona lookups/ceremonies). These never run a turn.
103const CONTROL_DIAL_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(30);
104
105/// Build the Connect HTTP transport for `uri`, honoring its scheme: `https`
106/// dials over TLS (OS trust store); anything else (incl. a scheme-less
107/// `host:port`) uses plaintext. An `https` address is **never** silently
108/// downgraded — a TLS build failure surfaces as [`DialError::Tls`].
109fn http_client_for(uri: &http::Uri) -> Result<HttpClient, DialError> {
110    if uri.scheme_str() == Some("https") {
111        use rustls_platform_verifier::ConfigVerifierExt;
112        let tls = rustls::ClientConfig::with_platform_verifier()
113            .map_err(|e| DialError::Tls(e.to_string()))?;
114        Ok(HttpClient::with_tls(std::sync::Arc::new(tls)))
115    } else {
116        Ok(HttpClient::plaintext())
117    }
118}
119
120/// Per-call options carrying the active span's W3C `traceparent`, so the
121/// control-plane handler re-parents on this turn's span instead of starting a
122/// fresh trace. Cheap when no propagator is installed (the global getter is a
123/// no-op and no header is set). The per-call deadline comes from the client's
124/// `with_default_timeout`, so it need not be repeated here.
125fn traced_options() -> connectrpc::client::CallOptions {
126    let mut headers = http::HeaderMap::new();
127    polyc_runtime::propagation::inject_current_span_into(&mut headers);
128    connectrpc::client::CallOptions::default()
129        .with_headers(headers.into_iter().filter_map(|(n, v)| n.map(|n| (n, v))))
130}
131
132/// Incremental event emitted while a turn streams from the control plane.
133///
134/// Unlike [`AgentDialer::run_turn`], which folds the whole turn into one
135/// string, the streaming API surfaces each meaningful step as it arrives so a
136/// live surface (e.g. Slack `chat.appendStream`) can update in place.
137#[derive(Debug, Clone, PartialEq, Eq)]
138pub enum TurnEvent {
139    /// Incremental assistant answer text (model/assistant role).
140    TextDelta(String),
141    /// A tool call has started, named for a user-visible "thinking step".
142    ToolStarted {
143        /// The tool/function name, or the call id when the name is absent.
144        name: String,
145    },
146    /// The turn paused before executing a tool that requires human approval.
147    /// Emitted (one per pending call) from the terminal `AgentEnd` just before
148    /// [`TurnEvent::Done`]. The caller submits a decision via
149    /// `ApprovalService.Respond` (the THIN path) and re-drives the turn.
150    ApprovalPending {
151        /// Tool-call id == the approval `request_id` to answer.
152        request_id: String,
153        /// The tool/function name awaiting approval. Raw machine identifier;
154        /// the field of record for trust/audit.
155        tool_name: String,
156        /// Human display label (MCP-style `title`) for the tool, for rendering
157        /// in the approval prompt. May be empty; the surface then derives one
158        /// from `tool_name`.
159        title: String,
160        /// Arguments JSON for the call.
161        args_json: String,
162    },
163    /// The turn suspended to delegate to a sub-agent: the model invoked the
164    /// reserved `__handoff_to` primitive. Surfaced (once) from the terminal
165    /// `AgentEnd.handoff` just before [`TurnEvent::Done`]. The child
166    /// conversation runs independently; the parent resumes when the child
167    /// returns, so an edge can render "delegating…" rather than going silent.
168    HandoffStarted {
169        /// The child agent / planner chosen; empty selects the parent's
170        /// default planner.
171        child_agent_id: String,
172        /// Free-form reason captured for operator visibility.
173        reason: String,
174    },
175    /// Terminal event: the turn has ended and no further events follow.
176    Done,
177}
178
179/// Reusable handle for dialing the polychrome control plane.
180#[derive(Clone)]
181pub struct AgentDialer {
182    client: Arc<AgentServiceClient<HttpClient>>,
183}
184
185impl AgentDialer {
186    /// Build a dialer pointed at `addr` (expects `http://host:port`).
187    ///
188    /// # Errors
189    ///
190    /// Returns [`DialError::InvalidAddress`] if `addr` isn't a valid URI.
191    pub fn new(addr: &str) -> Result<Self, DialError> {
192        let uri = addr
193            .parse::<http::Uri>()
194            .map_err(|source| DialError::InvalidAddress {
195                addr: addr.to_owned(),
196                source,
197            })?;
198        let http = http_client_for(&uri)?;
199        let config = ClientConfig::new(uri).with_default_timeout(AGENT_DIAL_TIMEOUT);
200        let client = AgentServiceClient::new(http, config);
201        Ok(Self {
202            client: Arc::new(client),
203        })
204    }
205
206    /// Build an [`ApprovalDialer`] for the SAME control-plane endpoint.
207    /// `AgentService` and `ApprovalService` are served on one Connect port, so
208    /// an approval client reuses the agent address — callers that already hold
209    /// an `AgentDialer` don't need to thread a second address.
210    ///
211    /// # Errors
212    /// Returns [`DialError::InvalidAddress`] if `addr` isn't a valid URI.
213    pub fn approval_dialer(addr: &str) -> Result<ApprovalDialer, DialError> {
214        ApprovalDialer::new(addr)
215    }
216
217    /// Run one turn against the control plane and collect the aggregated
218    /// text response.
219    ///
220    /// `conversation_id` is the stable id for the conversation (the Slack
221    /// adapter derives it from the thread; the CLI takes it as an argument).
222    /// `user_text` is the user message with any bot mention already stripped.
223    /// Returns the concatenated assistant text across every batch in the
224    /// response stream, or `Ok(String::new())` if the turn produced no text.
225    /// Non-text content variants (tool calls, tool results, thoughts) are
226    /// rendered as bracketed placeholders.
227    ///
228    /// # Errors
229    ///
230    /// Returns [`DialError::Connect`] for any transport/stream/encoding
231    /// error from the `AgentService` call.
232    pub async fn run_turn(
233        &self,
234        conversation_id: &str,
235        exec_id: &str,
236        user_text: &str,
237    ) -> Result<String, DialError> {
238        self.run_turn_with(conversation_id, exec_id, user_text, Attribution::default())
239            .await
240    }
241
242    /// Like [`run_turn`](Self::run_turn) but attributes the turn to a caller
243    /// (and participants). Non-streaming edges that resolve an identity via
244    /// [`EdgeAdapter::caller`] use this so their turns populate
245    /// `AgentStart.caller` (persona attribution) — the buffered analog of
246    /// [`run_turn_streaming_messages_with`](Self::run_turn_streaming_messages_with).
247    ///
248    /// # Errors
249    ///
250    /// Returns [`DialError::Connect`] for any transport/stream/encoding error.
251    pub async fn run_turn_with(
252        &self,
253        conversation_id: &str,
254        exec_id: &str,
255        user_text: &str,
256        attribution: Attribution,
257    ) -> Result<String, DialError> {
258        let request = build_request(
259            conversation_id,
260            exec_id,
261            vec![text_message("user", user_text)],
262            None,
263            attribution,
264        );
265
266        let mut stream = self
267            .client
268            .connect_with_options(request, traced_options())
269            .await?;
270        // The assistant's prose answer. Tool calls, tool results, and thoughts
271        // are intermediate scaffolding — they're aggregated separately and only
272        // surfaced when the turn produced no text at all (e.g. a turn that ends
273        // on a tool_call / approval pause), so a normal answer reads cleanly
274        // instead of "[tool_call:…]\n[tool_result:…]\nIt is 3pm."
275        let mut text_parts: Vec<String> = Vec::new();
276        let mut scaffolding: Vec<String> = Vec::new();
277        while let Some(view) = stream.message().await? {
278            let response = view.to_owned_message();
279            match response.r#type {
280                Some(agent_response::Type::Outputs(outputs)) => {
281                    for msg in outputs.messages {
282                        // The turn stream carries every message the turn
283                        // produced — including the tool-execution result, which
284                        // the control plane emits as a `tool`-role Text block
285                        // (the raw tool output, e.g. `{"now":"…"}`). Only the
286                        // `model`/`assistant` text is the user-facing answer;
287                        // tool-role text is intermediate data, not the reply.
288                        let is_assistant = matches!(msg.role.as_str(), "model" | "assistant");
289                        let Some(content_block) = msg.content.into_option() else {
290                            continue;
291                        };
292                        match content_block.r#type {
293                            Some(content::Type::Text(t)) if is_assistant && !t.text.is_empty() => {
294                                text_parts.push(t.text);
295                            }
296                            // Non-assistant text (tool result echo, user) is not
297                            // the reply — drop it entirely (not even fallback).
298                            Some(content::Type::Text(_)) => {}
299                            other => {
300                                if let Some(rendered) = render_content(other) {
301                                    scaffolding.push(rendered);
302                                }
303                            }
304                        }
305                    }
306                }
307                // An explicit AgentEnd ends the turn. The stream closing
308                // without one is an implicit end-of-turn; we still return
309                // whatever was aggregated.
310                Some(agent_response::Type::End(_)) => break,
311                None => {
312                    // Empty envelope — defensive; the control plane shouldn't
313                    // send these but the wire allows it. Ignore.
314                }
315            }
316        }
317        let reply = if text_parts.is_empty() {
318            scaffolding.join("\n")
319        } else {
320            text_parts.join("\n")
321        };
322        Ok(reply)
323    }
324
325    /// Run one turn against the control plane and stream each meaningful step
326    /// as a [`TurnEvent`], for live surfaces that update in place.
327    ///
328    /// The request is built identically to [`AgentDialer::run_turn`]; see that
329    /// method for the meaning of `conversation_id`, `exec_id`, and
330    /// `user_text`. The returned stream yields:
331    ///
332    /// - [`TurnEvent::TextDelta`] for each model/assistant-role text block,
333    /// - [`TurnEvent::ToolStarted`] when a tool call begins, and
334    /// - [`TurnEvent::Done`] once at end-of-turn, after which the stream ends.
335    ///
336    /// Tool-role result echoes and empty/non-textual blocks produce no event.
337    ///
338    /// # Errors
339    ///
340    /// The outer `Result` carries a [`DialError::Connect`] if opening the
341    /// stream fails. Each item is a `Result` so per-message transport/decode
342    /// errors surface inline without tearing down the whole stream type.
343    pub async fn run_turn_streaming(
344        &self,
345        conversation_id: &str,
346        exec_id: &str,
347        user_text: &str,
348    ) -> Result<impl Stream<Item = Result<TurnEvent, DialError>>, DialError> {
349        self.run_turn_streaming_messages(
350            conversation_id,
351            exec_id,
352            vec![text_message("user", user_text)],
353        )
354        .await
355    }
356
357    /// Like [`Self::run_turn_streaming`] but takes a pre-built (e.g. attributed
358    /// multi-party) message list as the turn input.
359    ///
360    /// # Errors
361    ///
362    /// Returns [`DialError::Connect`] for any transport/stream/encoding error.
363    pub async fn run_turn_streaming_messages(
364        &self,
365        conversation_id: &str,
366        exec_id: &str,
367        messages: Vec<Message>,
368    ) -> Result<impl Stream<Item = Result<TurnEvent, DialError>>, DialError> {
369        self.run_turn_streaming_messages_with(
370            conversation_id,
371            exec_id,
372            messages,
373            None,
374            Attribution::default(),
375        )
376        .await
377    }
378
379    /// The full-fidelity variant: one method carries everything a turn's
380    /// request can — a settled inbound [`PaymentReceipt`] (the control plane
381    /// persists a signed `payment_receipt` event in the turn's atomic batch)
382    /// and the caller [`Attribution`] (resolved to durable personas and
383    /// recorded as `caller`/`participant` events). One method rather than a
384    /// matrix of variants, so a paid *and* attributed edge can't silently
385    /// drop one of the two.
386    ///
387    /// # Errors
388    ///
389    /// Returns [`DialError::Connect`] for any transport/stream/encoding error.
390    pub async fn run_turn_streaming_messages_with(
391        &self,
392        conversation_id: &str,
393        exec_id: &str,
394        messages: Vec<Message>,
395        payment_receipt: Option<PaymentReceipt>,
396        attribution: Attribution,
397    ) -> Result<impl Stream<Item = Result<TurnEvent, DialError>>, DialError> {
398        let request = build_request(
399            conversation_id,
400            exec_id,
401            messages,
402            payment_receipt,
403            attribution,
404        );
405        self.run_turn_streaming_request(request).await
406    }
407
408    /// Open the connect stream for one pre-built request and project the
409    /// response envelopes into [`TurnEvent`]s — the single transport body
410    /// every streaming variant shares.
411    async fn run_turn_streaming_request(
412        &self,
413        request: AgentRequest,
414    ) -> Result<impl Stream<Item = Result<TurnEvent, DialError>>, DialError> {
415        let mut stream = self
416            .client
417            .connect_with_options(request, traced_options())
418            .await?;
419        Ok(async_stream::try_stream! {
420            while let Some(view) = stream.message().await? {
421                let response = view.to_owned_message();
422                match response.r#type {
423                    Some(agent_response::Type::Outputs(outputs)) => {
424                        for msg in outputs.messages {
425                            if let Some(event) = message_to_event(msg) {
426                                yield event;
427                            }
428                        }
429                    }
430                    Some(agent_response::Type::End(end)) => {
431                        // The terminal envelope's extensions (pending approvals,
432                        // a sub-agent handoff) surface before the terminal Done
433                        // so a live surface can prompt / show "delegating…".
434                        for event in events_from_end(*end) {
435                            yield event;
436                        }
437                        return;
438                    }
439                    // Empty envelope — defensive; ignore (mirrors `run_turn`).
440                    None => {}
441                }
442            }
443            // Stream closed without an explicit AgentEnd: still signal a
444            // terminal event so the caller can finalise the live surface.
445            yield TurnEvent::Done;
446        })
447    }
448
449    /// Ask the control plane's participation gate whether to reply to the
450    /// latest message of a (multi-party) thread. Returns `true` only on
451    /// `respond`; `notify` / `ignore` map to `false` (stay silent). The gate
452    /// runs a cheap classifier model server-side and never runs a turn.
453    ///
454    /// # Errors
455    ///
456    /// Returns [`DialError::Connect`] for any transport/encoding error.
457    pub async fn should_respond(
458        &self,
459        conversation_id: &str,
460        bot_name: &str,
461        transcript: Vec<ParticipantMessage>,
462    ) -> Result<bool, DialError> {
463        let request = ClassifyRequest {
464            conversation_id: conversation_id.to_owned(),
465            bot_name: bot_name.to_owned(),
466            transcript,
467            ..Default::default()
468        };
469        let resp = self
470            .client
471            .classify_with_options(request, traced_options())
472            .await?
473            .into_owned();
474        Ok(resp.verdict.to_i32() == Verdict::VERDICT_RESPOND as i32)
475    }
476}
477
478/// The persisted outcome of an `ApprovalService.Respond` call.
479///
480/// On the THIN path the control plane is the signer: the caller submits an
481/// UNSIGNED decision and the control plane appends a server-signed
482/// `approval_response` event, returning the signature so the caller can show /
483/// audit a verifiable outcome.
484#[derive(Debug, Clone, PartialEq, Eq)]
485pub struct ApprovalOutcome {
486    /// `true` if a matching pending request existed and the response was
487    /// persisted; `false` is the idempotent no-op (already answered / unknown).
488    pub persisted: bool,
489    /// Lowercase-hex ed25519 signature over the canonical response bytes.
490    pub signature_hex: String,
491    /// Lowercase-hex public key the signature verifies against.
492    pub signed_by_hex: String,
493}
494
495/// Reusable handle for the control plane's `ApprovalService`.
496///
497/// The THIN human-in-the-loop path. Shares the `AgentService` endpoint — both
498/// are served on one Connect port — so it is built from the same address.
499#[derive(Clone)]
500pub struct ApprovalDialer {
501    client: Arc<ApprovalServiceClient<HttpClient>>,
502}
503
504impl ApprovalDialer {
505    /// Build a dialer pointed at `addr` (expects `http://host:port`).
506    ///
507    /// # Errors
508    /// Returns [`DialError::InvalidAddress`] if `addr` isn't a valid URI.
509    pub fn new(addr: &str) -> Result<Self, DialError> {
510        let uri = addr
511            .parse::<http::Uri>()
512            .map_err(|source| DialError::InvalidAddress {
513                addr: addr.to_owned(),
514                source,
515            })?;
516        let http = http_client_for(&uri)?;
517        let client = ApprovalServiceClient::new(
518            http,
519            ClientConfig::new(uri).with_default_timeout(CONTROL_DIAL_TIMEOUT),
520        );
521        Ok(Self {
522            client: Arc::new(client),
523        })
524    }
525
526    /// Submit a human's decision for a pending approval. The decision is
527    /// UNSIGNED — the control plane signs and persists it (THIN path) and
528    /// returns the signature. Idempotent: answering an already-decided or
529    /// unknown `request_id` returns `persisted: false`.
530    ///
531    /// # Errors
532    /// Returns [`DialError::Connect`] for any transport/encoding error.
533    pub async fn respond(
534        &self,
535        request_id: &str,
536        approved: bool,
537        reason: &str,
538        conversation_id: &str,
539    ) -> Result<ApprovalOutcome, DialError> {
540        let request = ApprovalResponseRequest {
541            request_id: request_id.to_owned(),
542            approved,
543            reason: reason.to_owned(),
544            conversation_id: conversation_id.to_owned(),
545            ..Default::default()
546        };
547        let reply = self
548            .client
549            .respond_with_options(request, traced_options())
550            .await?
551            .into_owned();
552        Ok(ApprovalOutcome {
553            persisted: reply.persisted,
554            signature_hex: reply.signature_hex,
555            signed_by_hex: reply.signed_by_hex,
556        })
557    }
558}
559
560/// A freshly minted link-ceremony challenge: the code an edge must deliver
561/// privately to the requesting user, plus its absolute expiry.
562#[derive(Debug, Clone, PartialEq, Eq)]
563pub struct StartedLink {
564    /// The single-use 6-digit code (a credential — never log or display it
565    /// outside a private channel the user owns).
566    pub code: String,
567    /// Unix-ms after which the code is refused.
568    pub expires_at_ms: u64,
569    /// The persona the code is bound to.
570    pub persona_id: String,
571}
572
573/// A freshly minted deep-link token: the opaque value an edge embeds in a
574/// platform URL for a no-typing ceremony, plus its absolute expiry.
575#[derive(Debug, Clone, PartialEq, Eq)]
576pub struct StartedDeepLink {
577    /// The single-use, high-entropy token (a credential — only ever placed in
578    /// a deep-link URL delivered to a private channel the user owns).
579    pub token: String,
580    /// Unix-ms after which the token is refused.
581    pub expires_at_ms: u64,
582    /// The persona the token is bound to.
583    pub persona_id: String,
584}
585
586/// The outcome of completing a link ceremony, in edge-renderable terms.
587///
588/// `InvalidCode` and `Expired` are deliberately fused into one variant: the
589/// proto contract is that edges present them identically so a guesser cannot
590/// learn whether a code was ever valid. The distinction stays in the control
591/// plane's logs.
592#[derive(Debug, Clone, PartialEq, Eq)]
593pub enum LinkCeremony {
594    /// The identity is now bound to the persona (directly or by merge).
595    Linked {
596        /// The surviving persona id.
597        persona_id: String,
598    },
599    /// The identity already resolved to that persona; the code was consumed.
600    AlreadyLinked {
601        /// The persona id.
602        persona_id: String,
603    },
604    /// No usable code (never minted, already consumed, or expired). Rendered
605    /// identically to the user.
606    InvalidOrExpired,
607    /// The completing identity is locked out after repeated failures (or the
608    /// deployment-wide backstop tripped). Rendered distinctly: "wait".
609    Throttled,
610    /// An unexpected/unspecified outcome — the edge should surface a generic
611    /// failure rather than claim success.
612    Failed,
613}
614
615impl LinkCeremony {
616    /// The user-facing message for this outcome, shared by every edge.
617    ///
618    /// `InvalidCode` and `Expired` are fused into one variant upstream (so no
619    /// edge can distinguish them — no oracle for a guesser); centralizing the
620    /// rendering here keeps that security-relevant wording identical across
621    /// surfaces. Channel-neutral phrasing; an edge that needs surface-specific
622    /// text can still match the variant itself.
623    #[must_use]
624    pub const fn user_message(&self) -> &'static str {
625        match self {
626            Self::Linked { .. } => {
627                "✅ Linked — this account now shares one Polychrome persona with your other channels."
628            }
629            Self::AlreadyLinked { .. } => {
630                "✅ Already linked — this account was already on that persona."
631            }
632            Self::InvalidOrExpired => {
633                "That link is invalid or expired. Start a fresh one from your other channel and try again."
634            }
635            Self::Throttled => "Too many attempts. Please wait a few minutes and try again.",
636            Self::Failed => "Sorry — something went wrong completing that link. Please try again.",
637        }
638    }
639}
640
641/// Reusable handle for the control plane's `PersonaService`.
642///
643/// The verified identity-linking ceremonies. Shares the `AgentService`
644/// endpoint — all three services are served on one internal Connect port —
645/// so it is built from the same address.
646#[derive(Clone)]
647pub struct PersonaDialer {
648    client: Arc<PersonaServiceClient<HttpClient>>,
649}
650
651impl PersonaDialer {
652    /// Build a dialer pointed at `addr` (expects `http://host:port`).
653    ///
654    /// # Errors
655    /// Returns [`DialError::InvalidAddress`] if `addr` isn't a valid URI.
656    pub fn new(addr: &str) -> Result<Self, DialError> {
657        let uri = addr
658            .parse::<http::Uri>()
659            .map_err(|source| DialError::InvalidAddress {
660                addr: addr.to_owned(),
661                source,
662            })?;
663        let http = http_client_for(&uri)?;
664        let client = PersonaServiceClient::new(
665            http,
666            ClientConfig::new(uri).with_default_timeout(CONTROL_DIAL_TIMEOUT),
667        );
668        Ok(Self {
669            client: Arc::new(client),
670        })
671    }
672
673    /// Mint a single-use link-ceremony code bound to `identity`'s persona
674    /// (provisioning one on first contact). The returned [`StartedLink::code`]
675    /// is a credential the caller must deliver only to a private channel the
676    /// user owns.
677    ///
678    /// # Errors
679    /// Returns [`DialError::Connect`] for any transport/encoding error.
680    pub async fn start_link(&self, identity: ExternalIdentity) -> Result<StartedLink, DialError> {
681        let request = StartLinkRequest {
682            identity: buffa::MessageField::some(identity),
683            ..Default::default()
684        };
685        let reply = self
686            .client
687            .start_link_with_options(request, traced_options())
688            .await?
689            .into_owned();
690        Ok(StartedLink {
691            code: reply.code,
692            expires_at_ms: reply.expires_at_ms,
693            persona_id: reply.persona_id,
694        })
695    }
696
697    /// Mint a high-entropy deep-link token bound to `identity`'s persona, for
698    /// a no-typing ceremony (the caller embeds [`StartedDeepLink::token`] in a
699    /// platform URL like `https://t.me/<bot>?start=<token>`). The token is a
700    /// credential — deliver the URL only to a private channel the user owns.
701    /// Completed via [`Self::complete_link`] from the target platform.
702    ///
703    /// # Errors
704    /// Returns [`DialError::Connect`] for any transport/encoding error.
705    pub async fn start_deeplink(
706        &self,
707        identity: ExternalIdentity,
708    ) -> Result<StartedDeepLink, DialError> {
709        let request = StartDeepLinkRequest {
710            identity: buffa::MessageField::some(identity),
711            ..Default::default()
712        };
713        let reply = self
714            .client
715            .start_deep_link_with_options(request, traced_options())
716            .await?
717            .into_owned();
718        Ok(StartedDeepLink {
719            token: reply.token,
720            expires_at_ms: reply.expires_at_ms,
721            persona_id: reply.persona_id,
722        })
723    }
724
725    /// Consume `code` from the channel being claimed, binding `identity` to
726    /// the minting persona (merging `identity`'s prior persona if it had one).
727    ///
728    /// # Errors
729    /// Returns [`DialError::Connect`] for any transport/encoding error.
730    pub async fn complete_link(
731        &self,
732        code: &str,
733        identity: ExternalIdentity,
734    ) -> Result<LinkCeremony, DialError> {
735        let request = CompleteLinkRequest {
736            code: code.to_owned(),
737            identity: buffa::MessageField::some(identity),
738            ..Default::default()
739        };
740        let reply = self
741            .client
742            .complete_link_with_options(request, traced_options())
743            .await?
744            .into_owned();
745        Ok(match reply.outcome.as_known() {
746            Some(LinkOutcome::Linked) => LinkCeremony::Linked {
747                persona_id: reply.persona_id,
748            },
749            Some(LinkOutcome::AlreadyLinked) => LinkCeremony::AlreadyLinked {
750                persona_id: reply.persona_id,
751            },
752            // Fused on purpose — see `LinkCeremony::InvalidOrExpired`.
753            Some(LinkOutcome::InvalidCode | LinkOutcome::Expired) => LinkCeremony::InvalidOrExpired,
754            Some(LinkOutcome::Throttled) => LinkCeremony::Throttled,
755            Some(LinkOutcome::Unspecified) | None => LinkCeremony::Failed,
756        })
757    }
758
759    /// Read-only: the profile the `identity` resolves to, as an edge-friendly
760    /// [`PersonaView`]. Returns an all-empty view for an identity not yet in
761    /// the directory; **never provisions** a persona.
762    ///
763    /// # Errors
764    /// Returns [`DialError::Connect`] for any transport/encoding error.
765    pub async fn describe(&self, identity: ExternalIdentity) -> Result<PersonaView, DialError> {
766        let request = DescribeRequest {
767            identity: buffa::MessageField::some(identity),
768            ..Default::default()
769        };
770        let reply = self
771            .client
772            .describe_with_options(request, traced_options())
773            .await?
774            .into_owned();
775        // Unknown identity → empty profile → an all-default view (the
776        // not-yet-claimed dashboard state).
777        let Some(profile) = reply.profile.into_option() else {
778            return Ok(PersonaView {
779                persona_id: reply.persona_id,
780                ..Default::default()
781            });
782        };
783        Ok(PersonaView {
784            persona_id: reply.persona_id,
785            status: profile.status,
786            identities: profile
787                .identities
788                .into_iter()
789                .map(|id| LinkedIdentity {
790                    provider: id.provider,
791                    external_id: id.external_id,
792                    display_name: id.display_name,
793                })
794                .collect(),
795        })
796    }
797}
798
799/// One external identity linked to a persona, as the dashboard renders it
800/// (provider · display name · id).
801#[derive(Debug, Clone, PartialEq, Eq)]
802pub struct LinkedIdentity {
803    /// Edge namespace ("slack", "telegram", …).
804    pub provider: String,
805    /// The provider-native stable id.
806    pub external_id: String,
807    /// Display name as last observed (presentation only).
808    pub display_name: String,
809}
810
811/// An edge-friendly read of a persona, for surfaces like the App Home
812/// dashboard. Every field is empty when the queried identity is not in the
813/// directory (a not-yet-claimed user).
814#[derive(Debug, Clone, PartialEq, Eq, Default)]
815pub struct PersonaView {
816    /// The persona id; empty when the identity is unknown.
817    pub persona_id: String,
818    /// "provisional" | "linked" | "merged"; empty when unknown.
819    pub status: String,
820    /// Every identity currently linked to the persona.
821    pub identities: Vec<LinkedIdentity>,
822}
823
824/// A turn-input message, re-exported so callers can build attributed
825/// multi-party input for [`AgentDialer::run_turn_streaming_messages`] without
826/// depending on `polyc-proto`.
827pub use polyc_proto::proto::polychrome::agent::v1::Message as TurnMessage;
828/// The attributed transcript line type the participation gate consumes.
829/// Re-exported so callers build requests without importing `polyc-proto`.
830pub use polyc_proto::proto::polychrome::agent::v1::ParticipantMessage as GateMessage;
831/// A settled inbound payment receipt, re-exported so the public-edge layer can
832/// thread it into [`AgentDialer::run_turn_streaming_messages_with`]
833/// without depending on `polyc-proto` directly.
834pub use polyc_proto::proto::polychrome::agent::v1::PaymentReceipt;
835
836/// The external identity of a human observed at an edge (re-exported wire
837/// type, shared with the persona store records): the per-edge `EdgeAdapter`
838/// mapping produces these and the control plane resolves them to durable
839/// personas.
840pub use polyc_proto::proto::polychrome::persona::v1::ExternalIdentity;
841
842/// Caller attribution for one turn.
843///
844/// Carries the identity whose message triggered the turn (`caller`) and any
845/// other humans whose messages entered the turn's input (`participants`).
846/// Edges without caller identity pass the default.
847#[derive(Debug, Clone, Default)]
848pub struct Attribution {
849    /// The identity whose message triggered the turn.
850    pub caller: Option<ExternalIdentity>,
851    /// Other humans whose messages entered this turn's input.
852    pub participants: Vec<ExternalIdentity>,
853}
854
855/// Build an attributed user-role turn message rendered as `"<speaker>: text"`,
856/// so the model sees who said what in a multi-party thread.
857#[must_use]
858pub fn attributed_message(speaker: &str, text: &str) -> TurnMessage {
859    text_message("user", &format!("{speaker}: {text}"))
860}
861
862/// Build a plain user-role turn message (no speaker attribution) — used for
863/// 1:1 DMs where there is only one human in the conversation.
864#[must_use]
865pub fn user_message(text: &str) -> TurnMessage {
866    text_message("user", text)
867}
868
869/// Build the documented **readable** namespaced conversation id,
870/// `"{namespace}:{native_id}"`.
871///
872/// Every edge derives [`AgentRequest::conversation_id`] from its native unit
873/// (a thread, a ticket, an issue, a session). The convention is a namespace
874/// prefix that names the edge family, followed by the edge's own stable handle
875/// for the conversation — e.g. `github:owner/repo#42`, `web:{session-uuid}`,
876/// `mcp:{caller-id}`. The id is opaque to the orchestration core (it is the
877/// event-log partition key, `conv-{id}`); only the *format* is a shared
878/// convention so edges stay consistent and ids are greppable.
879///
880/// Edges whose native coordinate is unwieldy or sensitive — many chat
881/// platforms — should instead use [`hashed_conversation_id`], which collapses
882/// the coordinate into a fixed-length opaque `UUIDv5` under a pinned namespace.
883#[must_use]
884pub fn namespaced_id(namespace: &str, native_id: &str) -> String {
885    format!("{namespace}:{native_id}")
886}
887
888/// Derive a stable, fixed-length conversation id by hashing `parts` into a
889/// `UUIDv5` under a pinned `namespace`.
890///
891/// This is the "opaque hash" namespacing policy: a deterministic,
892/// stateless-across-restarts id (no `native → id` table to keep) that is
893/// globally unique enough to key the event-log partition. `parts` are joined
894/// with `':'` before hashing, so a caller passing `["T1", "C1", "169…"]`
895/// hashes exactly `"T1:C1:169…"`.
896///
897/// The pinned `namespace` UUID **must never change** for a given edge — every
898/// existing conversation id for that edge depends on it. `polychrome-slack`
899/// uses this policy (its `UUIDv5` over `(team, channel, thread_ts)`); new edges
900/// pick their own frozen namespace.
901#[must_use]
902pub fn hashed_conversation_id(namespace: uuid::Uuid, parts: &[&str]) -> String {
903    let joined = parts.join(":");
904    uuid::Uuid::new_v5(&namespace, joined.as_bytes())
905        .hyphenated()
906        .to_string()
907}
908
909/// Collision-free variant of [`hashed_conversation_id`] for edges whose native
910/// parts can themselves contain `':'` (e.g. an email `Message-ID`, a URL, a
911/// path).
912///
913/// [`hashed_conversation_id`] joins parts with a bare `':'`, so `["a:b", "c"]`
914/// and `["a", "b:c"]` both hash `"a:b:c"` and collide. This helper instead
915/// **length-prefixes** each part (`"{len}:{part}"`), which is unambiguous
916/// regardless of any `':'` inside a part — the length says exactly how many
917/// bytes the part occupies, so distinct part boundaries always produce distinct
918/// hash inputs. Use it for any edge whose coordinate fields are not guaranteed
919/// `':'`-free.
920///
921/// Distinct from [`hashed_conversation_id`] precisely because the framing
922/// differs; the two do not agree for the same `parts`.
923#[must_use]
924pub fn framed_conversation_id(namespace: uuid::Uuid, parts: &[&str]) -> String {
925    let mut framed = String::new();
926    for p in parts {
927        framed.push_str(&p.len().to_string());
928        framed.push(':');
929        framed.push_str(p);
930    }
931    uuid::Uuid::new_v5(&namespace, framed.as_bytes())
932        .hyphenated()
933        .to_string()
934}
935
936/// Build the single [`AgentRequest`] that opens an `AgentService.connect`
937/// stream. Shared by [`AgentDialer::run_turn`] and
938/// [`AgentDialer::run_turn_streaming`] so the two paths can't drift.
939fn build_request(
940    conversation_id: &str,
941    exec_id: &str,
942    messages: Vec<Message>,
943    payment_receipt: Option<PaymentReceipt>,
944    attribution: Attribution,
945) -> AgentRequest {
946    AgentRequest {
947        conversation_id: conversation_id.to_owned(),
948        exec_id: exec_id.to_owned(),
949        start: buffa::MessageField::some(AgentStart {
950            agent_id: String::new(),
951            agent_config: Vec::new(),
952            messages,
953            payment_receipt: payment_receipt
954                .map_or_else(buffa::MessageField::none, buffa::MessageField::some),
955            caller: attribution
956                .caller
957                .map_or_else(buffa::MessageField::none, buffa::MessageField::some),
958            participants: attribution.participants,
959            ..Default::default()
960        }),
961        ..Default::default()
962    }
963}
964
965/// Project a terminal [`AgentEnd`] into the [`TurnEvent`]s a live surface sees
966/// at end-of-turn: one [`TurnEvent::ApprovalPending`] per paused tool call,
967/// then a [`TurnEvent::HandoffStarted`] if the turn delegated to a sub-agent,
968/// then the terminal [`TurnEvent::Done`].
969///
970/// Pure (no I/O) so the End-arm mapping can be unit-tested without a live
971/// stream, mirroring [`message_to_event`] for the Outputs arm.
972fn events_from_end(end: AgentEnd) -> Vec<TurnEvent> {
973    let mut events: Vec<TurnEvent> = end
974        .pending_approvals
975        .into_iter()
976        .map(|pa| TurnEvent::ApprovalPending {
977            request_id: pa.request_id,
978            tool_name: pa.tool_name,
979            title: pa.title,
980            args_json: pa.args_json,
981        })
982        .collect();
983    if let Some(h) = end.handoff.into_option() {
984        events.push(TurnEvent::HandoffStarted {
985            child_agent_id: h.child_agent_id,
986            reason: h.reason,
987        });
988    }
989    events.push(TurnEvent::Done);
990    events
991}
992
993/// Map one output [`Message`] to an optional [`TurnEvent`] for the streaming
994/// path.
995///
996/// Pure (no I/O) so the mapping can be unit-tested without a live stream:
997///
998/// - A tool-call content block, or any tool-role message, becomes a
999///   [`TurnEvent::ToolStarted`] named for the called function (falling back to
1000///   the call id when the function name is absent).
1001/// - A non-empty text block on a model/assistant-role message becomes a
1002///   [`TurnEvent::TextDelta`].
1003/// - Tool-role text (a tool-result echo) and empty/non-textual blocks produce
1004///   no event.
1005fn message_to_event(msg: Message) -> Option<TurnEvent> {
1006    let content_block = msg.content.into_option()?;
1007    match content_block.r#type? {
1008        content::Type::ToolCall(tc) => {
1009            let name = match tc.r#type.as_ref() {
1010                Some(tool_call_content::Type::FunctionCall(fc)) if !fc.name.is_empty() => {
1011                    fc.name.clone()
1012                }
1013                _ => tc.id.clone(),
1014            };
1015            Some(TurnEvent::ToolStarted { name })
1016        }
1017        content::Type::Text(t) => {
1018            if is_assistant_role(&msg.role) && !t.text.is_empty() {
1019                Some(TurnEvent::TextDelta(t.text))
1020            } else {
1021                // Tool-role text is an intermediate result echo; empty text and
1022                // non-assistant roles carry nothing the user should see.
1023                None
1024            }
1025        }
1026        // Thoughts, tool results, and media variants have no streaming event in
1027        // this version.
1028        _ => None,
1029    }
1030}
1031
1032/// Whether a message role denotes the assistant's own answer text.
1033///
1034/// The control plane uses `model` (provider-native) and `assistant`
1035/// interchangeably for the agent's replies; both count.
1036fn is_assistant_role(role: &str) -> bool {
1037    role == "model" || role == "assistant"
1038}
1039
1040/// Render one [`content::Type`] variant as user-visible text.
1041///
1042/// Returns `None` for variants that have no useful textual representation
1043/// (image/audio/document/video/confirmation) so the `join("\n")` in
1044/// [`AgentDialer::run_turn`] doesn't emit stray blank lines.
1045///
1046/// Tool calls are rendered (rather than skipped) so a turn that ends on a
1047/// `tool_call` with no follow-up text — e.g. a HITL pause or approval-only
1048/// confirmation flow — gives the user something to see instead of an empty
1049/// body.
1050fn render_content(ty: Option<content::Type>) -> Option<String> {
1051    match ty? {
1052        content::Type::Text(t) => {
1053            if t.text.is_empty() {
1054                None
1055            } else {
1056                Some(t.text)
1057            }
1058        }
1059        content::Type::ToolCall(tc) => {
1060            let name = match tc.r#type.as_ref() {
1061                Some(tool_call_content::Type::FunctionCall(fc)) => fc.name.as_str(),
1062                None => "",
1063            };
1064            if name.is_empty() {
1065                Some(format!("[tool_call:{}]", tc.id))
1066            } else {
1067                Some(format!("[tool_call:{name} {}]", tc.id))
1068            }
1069        }
1070        content::Type::ToolResult(tr) => Some(format!("[tool_result:{}]", tr.call_id)),
1071        content::Type::Thought(t) => {
1072            // ThoughtContent has no top-level text; its `summary` repeated
1073            // field carries the user-visible reasoning when the provider
1074            // emits one. Collect any TextContent summaries; skip otherwise.
1075            let mut buf = String::new();
1076            for s in t.summary {
1077                if let Some(thought_summary_content::Type::Text(text)) = s.r#type
1078                    && !text.text.is_empty()
1079                {
1080                    if !buf.is_empty() {
1081                        buf.push(' ');
1082                    }
1083                    buf.push_str(&text.text);
1084                }
1085            }
1086            if buf.is_empty() {
1087                None
1088            } else {
1089                Some(format!("[thought] {buf}"))
1090            }
1091        }
1092        // image / audio / document / video / confirmation — no useful text
1093        // rendering for v1.
1094        _ => None,
1095    }
1096}
1097
1098#[cfg(test)]
1099mod tests {
1100    #![allow(clippy::pedantic, clippy::nursery, missing_docs)]
1101    use super::*;
1102    use polyc_proto::proto::polychrome::agent::v1::{
1103        Content, FunctionCallContent, TextContent, ThoughtContent, ThoughtSummaryContent,
1104        ToolCallContent, ToolResultContent,
1105    };
1106
1107    #[test]
1108    fn dial_error_retryable_classification() {
1109        use connectrpc::ErrorCode;
1110        // Transient transport conditions → retryable (ask for redelivery).
1111        for code in [
1112            ErrorCode::Unavailable,
1113            ErrorCode::DeadlineExceeded,
1114            ErrorCode::ResourceExhausted,
1115            ErrorCode::Aborted,
1116        ] {
1117            let err = DialError::Connect(connectrpc::ConnectError::new(code, "x"));
1118            assert_eq!(err.code(), Some(code));
1119            assert!(err.is_retryable(), "{code:?} should be retryable");
1120        }
1121        // Terminal codes → NOT retryable (redelivery would loop forever).
1122        for code in [
1123            ErrorCode::InvalidArgument,
1124            ErrorCode::Unauthenticated,
1125            ErrorCode::NotFound,
1126            ErrorCode::PermissionDenied,
1127            ErrorCode::Internal,
1128        ] {
1129            let err = DialError::Connect(connectrpc::ConnectError::new(code, "x"));
1130            assert_eq!(err.code(), Some(code));
1131            assert!(!err.is_retryable(), "{code:?} should not be retryable");
1132        }
1133        // Local failures carry no Connect code and are never retryable.
1134        let bad_addr = DialError::InvalidAddress {
1135            addr: "http://a b".to_owned(),
1136            source: "http://a b".parse::<http::Uri>().unwrap_err(),
1137        };
1138        assert_eq!(bad_addr.code(), None);
1139        assert!(!bad_addr.is_retryable());
1140        let tls = DialError::Tls("no provider".to_owned());
1141        assert_eq!(tls.code(), None);
1142        assert!(!tls.is_retryable());
1143    }
1144
1145    fn text(s: &str) -> Option<content::Type> {
1146        Some(content::Type::Text(Box::new(TextContent {
1147            text: s.to_owned(),
1148            ..Default::default()
1149        })))
1150    }
1151
1152    fn tool_call(id: &str, name: &str) -> Option<content::Type> {
1153        Some(content::Type::ToolCall(Box::new(ToolCallContent {
1154            id: id.to_owned(),
1155            r#type: Some(tool_call_content::Type::FunctionCall(Box::new(
1156                FunctionCallContent {
1157                    name: name.to_owned(),
1158                    ..Default::default()
1159                },
1160            ))),
1161            ..Default::default()
1162        })))
1163    }
1164
1165    fn tool_result(call_id: &str) -> Option<content::Type> {
1166        Some(content::Type::ToolResult(Box::new(ToolResultContent {
1167            call_id: call_id.to_owned(),
1168            ..Default::default()
1169        })))
1170    }
1171
1172    fn thought(summary: &str) -> Option<content::Type> {
1173        Some(content::Type::Thought(Box::new(ThoughtContent {
1174            summary: vec![ThoughtSummaryContent {
1175                r#type: Some(thought_summary_content::Type::Text(Box::new(TextContent {
1176                    text: summary.to_owned(),
1177                    ..Default::default()
1178                }))),
1179                ..Default::default()
1180            }],
1181            ..Default::default()
1182        })))
1183    }
1184
1185    #[test]
1186    fn renders_text_verbatim() {
1187        assert_eq!(render_content(text("hi")), Some("hi".to_owned()));
1188    }
1189
1190    #[test]
1191    fn renders_tool_call_with_name_and_id() {
1192        assert_eq!(
1193            render_content(tool_call("call_42", "search")),
1194            Some("[tool_call:search call_42]".to_owned())
1195        );
1196    }
1197
1198    #[test]
1199    fn renders_tool_call_without_function_name() {
1200        assert_eq!(
1201            render_content(Some(content::Type::ToolCall(Box::new(ToolCallContent {
1202                id: "call_bare".to_owned(),
1203                r#type: None,
1204                ..Default::default()
1205            })))),
1206            Some("[tool_call:call_bare]".to_owned())
1207        );
1208    }
1209
1210    #[test]
1211    fn renders_tool_result_by_call_id() {
1212        assert_eq!(
1213            render_content(tool_result("call_42")),
1214            Some("[tool_result:call_42]".to_owned())
1215        );
1216    }
1217
1218    #[test]
1219    fn renders_thought_summary() {
1220        assert_eq!(
1221            render_content(thought("considering options")),
1222            Some("[thought] considering options".to_owned())
1223        );
1224    }
1225
1226    #[test]
1227    fn empty_text_skipped() {
1228        assert_eq!(render_content(text("")), None);
1229    }
1230
1231    #[test]
1232    fn unknown_variant_skipped() {
1233        // No content::Type set at all.
1234        assert_eq!(render_content(None), None);
1235    }
1236
1237    // Helper used by aggregation tests: feed the same rendering loop
1238    // run_turn uses, but driven from a vector of fake content blocks rather
1239    // than a live connectrpc stream.
1240    fn aggregate(blocks: Vec<Option<content::Type>>) -> String {
1241        let mut parts: Vec<String> = Vec::new();
1242        for b in blocks {
1243            if let Some(s) = render_content(b) {
1244                parts.push(s);
1245            }
1246        }
1247        parts.join("\n")
1248    }
1249
1250    #[test]
1251    fn aggregate_pure_text_turn() {
1252        assert_eq!(
1253            aggregate(vec![text("hello"), text("world")]),
1254            "hello\nworld"
1255        );
1256    }
1257
1258    #[test]
1259    fn aggregate_tool_call_only_turn() {
1260        // A turn that ends in a tool call with no follow-up text used to
1261        // come back as "" and was silently dropped by the Slack handler.
1262        // Rendering a placeholder keeps the user informed.
1263        assert_eq!(
1264            aggregate(vec![tool_call("call_1", "lookup")]),
1265            "[tool_call:lookup call_1]"
1266        );
1267    }
1268
1269    #[test]
1270    fn aggregate_mixed_text_and_tool_call() {
1271        assert_eq!(
1272            aggregate(vec![text("thinking..."), tool_call("call_1", "search")]),
1273            "thinking...\n[tool_call:search call_1]"
1274        );
1275    }
1276
1277    // --- streaming mapping (message_to_event) ---
1278
1279    fn message(role: &str, ty: Option<content::Type>) -> Message {
1280        Message {
1281            role: role.to_owned(),
1282            content: buffa::MessageField::some(Content {
1283                r#type: ty,
1284                ..Default::default()
1285            }),
1286            ..Default::default()
1287        }
1288    }
1289
1290    #[test]
1291    fn assistant_text_becomes_text_delta() {
1292        assert_eq!(
1293            message_to_event(message("assistant", text("hello"))),
1294            Some(TurnEvent::TextDelta("hello".to_owned()))
1295        );
1296    }
1297
1298    #[test]
1299    fn model_role_also_counts_as_assistant() {
1300        assert_eq!(
1301            message_to_event(message("model", text("hi"))),
1302            Some(TurnEvent::TextDelta("hi".to_owned()))
1303        );
1304    }
1305
1306    #[test]
1307    fn tool_role_text_is_skipped() {
1308        // A tool-result echo carried as text must not surface as answer text.
1309        assert_eq!(message_to_event(message("tool", text("result blob"))), None);
1310    }
1311
1312    #[test]
1313    fn empty_assistant_text_is_skipped() {
1314        assert_eq!(message_to_event(message("assistant", text(""))), None);
1315    }
1316
1317    #[test]
1318    fn tool_call_becomes_tool_started_with_name() {
1319        assert_eq!(
1320            message_to_event(message("model", tool_call("call_7", "search"))),
1321            Some(TurnEvent::ToolStarted {
1322                name: "search".to_owned()
1323            })
1324        );
1325    }
1326
1327    #[test]
1328    fn tool_call_falls_back_to_call_id() {
1329        assert_eq!(
1330            message_to_event(message(
1331                "tool",
1332                Some(content::Type::ToolCall(Box::new(ToolCallContent {
1333                    id: "call_bare".to_owned(),
1334                    r#type: None,
1335                    ..Default::default()
1336                })))
1337            )),
1338            Some(TurnEvent::ToolStarted {
1339                name: "call_bare".to_owned()
1340            })
1341        );
1342    }
1343
1344    #[test]
1345    fn tool_result_produces_no_event() {
1346        assert_eq!(
1347            message_to_event(message("tool", tool_result("call_7"))),
1348            None
1349        );
1350    }
1351
1352    // Drives the same per-message mapping the streaming loop uses, over a
1353    // synthetic turn, then appends Done as the End arm would.
1354    fn map_turn(msgs: Vec<Message>) -> Vec<TurnEvent> {
1355        let mut events: Vec<TurnEvent> = msgs.into_iter().filter_map(message_to_event).collect();
1356        events.push(TurnEvent::Done);
1357        events
1358    }
1359
1360    #[test]
1361    fn synthetic_turn_yields_expected_event_sequence() {
1362        // model Text delta, a tool-role ToolCall, another model Text, End.
1363        let turn = vec![
1364            message("model", text("Let me look that up.")),
1365            message("tool", tool_call("call_1", "search")),
1366            message("model", text("Found it.")),
1367        ];
1368        assert_eq!(
1369            map_turn(turn),
1370            vec![
1371                TurnEvent::TextDelta("Let me look that up.".to_owned()),
1372                TurnEvent::ToolStarted {
1373                    name: "search".to_owned()
1374                },
1375                TurnEvent::TextDelta("Found it.".to_owned()),
1376                TurnEvent::Done,
1377            ]
1378        );
1379    }
1380
1381    // --- terminal-envelope projection (events_from_end) ---
1382
1383    use polyc_proto::proto::polychrome::agent::v1::{
1384        Handoff as WireHandoff, PendingApproval as WirePendingApproval,
1385    };
1386
1387    #[test]
1388    fn end_with_nothing_yields_only_done() {
1389        assert_eq!(events_from_end(AgentEnd::default()), vec![TurnEvent::Done]);
1390    }
1391
1392    #[test]
1393    fn end_with_handoff_yields_handoff_then_done() {
1394        let end = AgentEnd {
1395            handoff: buffa::MessageField::some(WireHandoff {
1396                call_id: "call_1".to_owned(),
1397                child_agent_id: "researcher".to_owned(),
1398                reason: "needs deep dive".to_owned(),
1399                ..Default::default()
1400            }),
1401            ..Default::default()
1402        };
1403        assert_eq!(
1404            events_from_end(end),
1405            vec![
1406                TurnEvent::HandoffStarted {
1407                    child_agent_id: "researcher".to_owned(),
1408                    reason: "needs deep dive".to_owned(),
1409                },
1410                TurnEvent::Done,
1411            ]
1412        );
1413    }
1414
1415    #[test]
1416    fn end_orders_approvals_before_handoff_before_done() {
1417        let end = AgentEnd {
1418            pending_approvals: vec![WirePendingApproval {
1419                request_id: "r1".to_owned(),
1420                tool_name: "delete_file".to_owned(),
1421                args_json: "{}".to_owned(),
1422                title: "Delete a file".to_owned(),
1423                ..Default::default()
1424            }],
1425            handoff: buffa::MessageField::some(WireHandoff {
1426                child_agent_id: "child".to_owned(),
1427                ..Default::default()
1428            }),
1429            ..Default::default()
1430        };
1431        assert_eq!(
1432            events_from_end(end),
1433            vec![
1434                TurnEvent::ApprovalPending {
1435                    request_id: "r1".to_owned(),
1436                    tool_name: "delete_file".to_owned(),
1437                    title: "Delete a file".to_owned(),
1438                    args_json: "{}".to_owned(),
1439                },
1440                TurnEvent::HandoffStarted {
1441                    child_agent_id: "child".to_owned(),
1442                    reason: String::new(),
1443                },
1444                TurnEvent::Done,
1445            ]
1446        );
1447    }
1448
1449    // --- namespaced conversation ids ---
1450
1451    #[test]
1452    fn namespaced_id_is_prefix_colon_native() {
1453        assert_eq!(
1454            namespaced_id("github", "owner/repo#42"),
1455            "github:owner/repo#42"
1456        );
1457        assert_eq!(namespaced_id("web", "abc-123"), "web:abc-123");
1458    }
1459
1460    #[test]
1461    fn hashed_conversation_id_is_deterministic_v5() {
1462        let ns = uuid::Uuid::from_u128(0x1234_5678_9abc_4def_8123_4567_89ab_cdef);
1463        let a = hashed_conversation_id(ns, &["T1", "C1", "169.000"]);
1464        let b = hashed_conversation_id(ns, &["T1", "C1", "169.000"]);
1465        assert_eq!(a, b);
1466        assert_ne!(a, hashed_conversation_id(ns, &["T1", "C2", "169.000"]));
1467        let parsed: uuid::Uuid = a.parse().unwrap();
1468        assert_eq!(parsed.get_version_num(), 5);
1469    }
1470
1471    #[test]
1472    fn framed_conversation_id_resists_separator_collisions() {
1473        let ns = uuid::Uuid::from_u128(0x99);
1474        // The exact case that collides under the bare-join helper.
1475        assert_ne!(
1476            framed_conversation_id(ns, &["a:b", "c"]),
1477            framed_conversation_id(ns, &["a", "b:c"]),
1478        );
1479        // Sanity: the bare-join helper DOES collide here (documents why framed exists).
1480        assert_eq!(
1481            hashed_conversation_id(ns, &["a:b", "c"]),
1482            hashed_conversation_id(ns, &["a", "b:c"]),
1483        );
1484        // Deterministic + valid v5.
1485        let id = framed_conversation_id(ns, &["mail", "<abc@x>"]);
1486        assert_eq!(id, framed_conversation_id(ns, &["mail", "<abc@x>"]));
1487        assert_eq!(id.parse::<uuid::Uuid>().unwrap().get_version_num(), 5);
1488    }
1489
1490    #[test]
1491    fn hashed_conversation_id_matches_slacks_inline_algorithm() {
1492        // Golden cross-check: the SDK helper must reproduce exactly what the
1493        // Slack edge used to compute inline (UUIDv5 of "team:channel:thread"
1494        // under a pinned namespace), so delegating in `polychrome-slack`
1495        // changes no existing conversation id.
1496        let ns = uuid::Uuid::from_u128(0xa1b2_c3d4_e5f6_4789_abcd_ef01_2345_6789);
1497        let parts = ["T01234ABCD", "C0B5V0JA401", "1700000000.000100"];
1498        let inline = uuid::Uuid::new_v5(&ns, parts.join(":").as_bytes())
1499            .hyphenated()
1500            .to_string();
1501        assert_eq!(hashed_conversation_id(ns, &parts), inline);
1502    }
1503
1504    #[test]
1505    fn link_outcome_messages_honor_presentation_rules() {
1506        // INVALID and EXPIRED are fused upstream into one variant, so neither
1507        // edge can distinguish them — one shared message, no oracle.
1508        assert!(
1509            LinkCeremony::InvalidOrExpired
1510                .user_message()
1511                .contains("invalid or expired")
1512        );
1513        // THROTTLED reads distinctly (wait, don't retry).
1514        assert!(LinkCeremony::Throttled.user_message().contains("wait"));
1515        assert!(
1516            LinkCeremony::Linked {
1517                persona_id: "p".to_owned()
1518            }
1519            .user_message()
1520            .contains("Linked")
1521        );
1522    }
1523}