polyc_rpc_client/lib.rs
1//! Thin connectrpc client over the generated
2//! [`polyc_proto::proto::polychrome::agent::v1::AgentServiceClient`].
3//!
4//! Both the operator CLI (`polychrome send`) and the reference-edge receiver dial the
5//! external-facing `AgentService` with the same Connect-streaming dance: build
6//! a connect-rust client, send one [`AgentRequest`], drain the
7//! [`AgentResponse`](polyc_proto::proto::polychrome::agent::v1::AgentResponse)
8//! stream, and render each non-empty content block to user-visible text. This
9//! crate is the single home for that logic so the two call sites can't drift.
10//!
11//! RPC-client concerns deliberately live here rather than in
12//! `polyc-agent` (the turn-loop crate, the wrong layer for a transport
13//! client).
14//!
15//! For v1 the whole turn is collected into a single string and returned at
16//! end-of-turn; incremental streaming (e.g. Slack `chat.appendStream`) is a
17//! follow-up that can build on the same client.
18
19#![forbid(unsafe_code)]
20#![warn(missing_docs)]
21
22pub mod edge;
23pub use edge::{EdgeAdapter, build_attribution};
24
25use std::sync::Arc;
26
27use connectrpc::client::{ClientConfig, HttpClient};
28use futures::Stream;
29use polyc_agent::text_message;
30use polyc_proto::proto::polychrome::agent::v1::{
31 AgentEnd, AgentRequest, AgentServiceClient, AgentStart, ClassifyRequest, Message,
32 ParticipantMessage, Verdict, agent_response, content, thought_summary_content,
33 tool_call_content,
34};
35use polyc_proto::proto::polychrome::approval::v1::{
36 ApprovalResponseRequest, ApprovalServiceClient,
37};
38use polyc_proto::proto::polychrome::persona::v1::{
39 CompleteLinkRequest, DescribeRequest, LinkOutcome, PersonaServiceClient, StartDeepLinkRequest,
40 StartLinkRequest,
41};
42
43/// Errors an agent dial can produce.
44#[derive(Debug, thiserror::Error)]
45pub enum DialError {
46 /// Could not parse the configured agent address as a URI.
47 #[error("invalid agent address {addr:?}: {source}")]
48 InvalidAddress {
49 /// The address string that failed to parse.
50 addr: String,
51 /// The underlying URI parse error.
52 #[source]
53 source: http::uri::InvalidUri,
54 },
55 /// Building the TLS client for an `https://` endpoint failed (e.g. no
56 /// process-default crypto provider). The dial fails closed rather than
57 /// silently downgrading to plaintext.
58 #[error("tls setup failed for agent address: {0}")]
59 Tls(String),
60 /// Connect-level error from the `AgentService` stream.
61 #[error(transparent)]
62 Connect(#[from] connectrpc::ConnectError),
63}
64
65impl DialError {
66 /// The Connect error code, when this is a transport-level Connect error
67 /// (`None` for local address/TLS-setup failures).
68 #[must_use]
69 pub const fn code(&self) -> Option<connectrpc::ErrorCode> {
70 match self {
71 Self::Connect(e) => Some(e.code),
72 Self::InvalidAddress { .. } | Self::Tls(_) => None,
73 }
74 }
75
76 /// Whether retrying the call could plausibly succeed. Only transient
77 /// transport conditions are retryable; terminal codes
78 /// (`InvalidArgument`, `Unauthenticated`, `NotFound`, …) and local
79 /// address/TLS failures are not. Edges use this to decide whether to ask
80 /// the source platform to redeliver (retryable) or to drop / 4xx
81 /// (terminal — redelivery would loop forever).
82 #[must_use]
83 pub const fn is_retryable(&self) -> bool {
84 matches!(
85 self.code(),
86 Some(
87 connectrpc::ErrorCode::Unavailable
88 | connectrpc::ErrorCode::DeadlineExceeded
89 | connectrpc::ErrorCode::ResourceExhausted
90 | connectrpc::ErrorCode::Aborted
91 )
92 )
93 }
94}
95
96/// Default per-call deadline for an agent turn (emitted as `Connect-Timeout-Ms`
97/// on every dial). Turns can be long (tool loops, slow providers), so this is
98/// generous; edges add their own outer `tokio::time::timeout` for defence.
99const AGENT_DIAL_TIMEOUT: std::time::Duration = std::time::Duration::from_mins(3);
100
101/// Default per-call deadline for the short control-plane RPCs (approval
102/// responses, persona lookups/ceremonies). These never run a turn.
103const CONTROL_DIAL_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(30);
104
105/// Build the Connect HTTP transport for `uri`, honoring its scheme: `https`
106/// dials over TLS (OS trust store); anything else (incl. a scheme-less
107/// `host:port`) uses plaintext. An `https` address is **never** silently
108/// downgraded — a TLS build failure surfaces as [`DialError::Tls`].
109fn http_client_for(uri: &http::Uri) -> Result<HttpClient, DialError> {
110 if uri.scheme_str() == Some("https") {
111 use rustls_platform_verifier::ConfigVerifierExt;
112 let tls = rustls::ClientConfig::with_platform_verifier()
113 .map_err(|e| DialError::Tls(e.to_string()))?;
114 Ok(HttpClient::with_tls(std::sync::Arc::new(tls)))
115 } else {
116 Ok(HttpClient::plaintext())
117 }
118}
119
120/// Per-call options carrying the active span's W3C `traceparent`, so the
121/// control-plane handler re-parents on this turn's span instead of starting a
122/// fresh trace. Cheap when no propagator is installed (the global getter is a
123/// no-op and no header is set). The per-call deadline comes from the client's
124/// `with_default_timeout`, so it need not be repeated here.
125fn traced_options() -> connectrpc::client::CallOptions {
126 let mut headers = http::HeaderMap::new();
127 polyc_runtime::propagation::inject_current_span_into(&mut headers);
128 connectrpc::client::CallOptions::default()
129 .with_headers(headers.into_iter().filter_map(|(n, v)| n.map(|n| (n, v))))
130}
131
132/// Incremental event emitted while a turn streams from the control plane.
133///
134/// Unlike [`AgentDialer::run_turn`], which folds the whole turn into one
135/// string, the streaming API surfaces each meaningful step as it arrives so a
136/// live surface (e.g. Slack `chat.appendStream`) can update in place.
137#[derive(Debug, Clone, PartialEq, Eq)]
138pub enum TurnEvent {
139 /// Incremental assistant answer text (model/assistant role).
140 TextDelta(String),
141 /// A tool call has started, named for a user-visible "thinking step".
142 ToolStarted {
143 /// The tool/function name, or the call id when the name is absent.
144 name: String,
145 },
146 /// The turn paused before executing a tool that requires human approval.
147 /// Emitted (one per pending call) from the terminal `AgentEnd` just before
148 /// [`TurnEvent::Done`]. The caller submits a decision via
149 /// `ApprovalService.Respond` (the THIN path) and re-drives the turn.
150 ApprovalPending {
151 /// Tool-call id == the approval `request_id` to answer.
152 request_id: String,
153 /// The tool/function name awaiting approval. Raw machine identifier;
154 /// the field of record for trust/audit.
155 tool_name: String,
156 /// Human display label (MCP-style `title`) for the tool, for rendering
157 /// in the approval prompt. May be empty; the surface then derives one
158 /// from `tool_name`.
159 title: String,
160 /// Arguments JSON for the call.
161 args_json: String,
162 },
163 /// The turn suspended to delegate to a sub-agent: the model invoked the
164 /// reserved `__handoff_to` primitive. Surfaced (once) from the terminal
165 /// `AgentEnd.handoff` just before [`TurnEvent::Done`]. The child
166 /// conversation runs independently; the parent resumes when the child
167 /// returns, so an edge can render "delegating…" rather than going silent.
168 HandoffStarted {
169 /// The child agent / planner chosen; empty selects the parent's
170 /// default planner.
171 child_agent_id: String,
172 /// Free-form reason captured for operator visibility.
173 reason: String,
174 },
175 /// Terminal event: the turn has ended and no further events follow.
176 Done,
177}
178
179/// Reusable handle for dialing the polychrome control plane.
180#[derive(Clone)]
181pub struct AgentDialer {
182 client: Arc<AgentServiceClient<HttpClient>>,
183}
184
185impl AgentDialer {
186 /// Build a dialer pointed at `addr` (expects `http://host:port`).
187 ///
188 /// # Errors
189 ///
190 /// Returns [`DialError::InvalidAddress`] if `addr` isn't a valid URI.
191 pub fn new(addr: &str) -> Result<Self, DialError> {
192 let uri = addr
193 .parse::<http::Uri>()
194 .map_err(|source| DialError::InvalidAddress {
195 addr: addr.to_owned(),
196 source,
197 })?;
198 let http = http_client_for(&uri)?;
199 let config = ClientConfig::new(uri).with_default_timeout(AGENT_DIAL_TIMEOUT);
200 let client = AgentServiceClient::new(http, config);
201 Ok(Self {
202 client: Arc::new(client),
203 })
204 }
205
206 /// Build an [`ApprovalDialer`] for the SAME control-plane endpoint.
207 /// `AgentService` and `ApprovalService` are served on one Connect port, so
208 /// an approval client reuses the agent address — callers that already hold
209 /// an `AgentDialer` don't need to thread a second address.
210 ///
211 /// # Errors
212 /// Returns [`DialError::InvalidAddress`] if `addr` isn't a valid URI.
213 pub fn approval_dialer(addr: &str) -> Result<ApprovalDialer, DialError> {
214 ApprovalDialer::new(addr)
215 }
216
217 /// Run one turn against the control plane and collect the aggregated
218 /// text response.
219 ///
220 /// `conversation_id` is the stable id for the conversation (the Slack
221 /// adapter derives it from the thread; the CLI takes it as an argument).
222 /// `user_text` is the user message with any bot mention already stripped.
223 /// Returns the concatenated assistant text across every batch in the
224 /// response stream, or `Ok(String::new())` if the turn produced no text.
225 /// Non-text content variants (tool calls, tool results, thoughts) are
226 /// rendered as bracketed placeholders.
227 ///
228 /// # Errors
229 ///
230 /// Returns [`DialError::Connect`] for any transport/stream/encoding
231 /// error from the `AgentService` call.
232 pub async fn run_turn(
233 &self,
234 conversation_id: &str,
235 exec_id: &str,
236 user_text: &str,
237 ) -> Result<String, DialError> {
238 self.run_turn_with(conversation_id, exec_id, user_text, Attribution::default())
239 .await
240 }
241
242 /// Like [`run_turn`](Self::run_turn) but attributes the turn to a caller
243 /// (and participants). Non-streaming edges that resolve an identity via
244 /// [`EdgeAdapter::caller`] use this so their turns populate
245 /// `AgentStart.caller` (persona attribution) — the buffered analog of
246 /// [`run_turn_streaming_messages_with`](Self::run_turn_streaming_messages_with).
247 ///
248 /// # Errors
249 ///
250 /// Returns [`DialError::Connect`] for any transport/stream/encoding error.
251 pub async fn run_turn_with(
252 &self,
253 conversation_id: &str,
254 exec_id: &str,
255 user_text: &str,
256 attribution: Attribution,
257 ) -> Result<String, DialError> {
258 let request = build_request(
259 conversation_id,
260 exec_id,
261 vec![text_message("user", user_text)],
262 None,
263 attribution,
264 );
265
266 let mut stream = self
267 .client
268 .connect_with_options(request, traced_options())
269 .await?;
270 // The assistant's prose answer. Tool calls, tool results, and thoughts
271 // are intermediate scaffolding — they're aggregated separately and only
272 // surfaced when the turn produced no text at all (e.g. a turn that ends
273 // on a tool_call / approval pause), so a normal answer reads cleanly
274 // instead of "[tool_call:…]\n[tool_result:…]\nIt is 3pm."
275 let mut text_parts: Vec<String> = Vec::new();
276 let mut scaffolding: Vec<String> = Vec::new();
277 while let Some(view) = stream.message().await? {
278 let response = view.to_owned_message();
279 match response.r#type {
280 Some(agent_response::Type::Outputs(outputs)) => {
281 for msg in outputs.messages {
282 // The turn stream carries every message the turn
283 // produced — including the tool-execution result, which
284 // the control plane emits as a `tool`-role Text block
285 // (the raw tool output, e.g. `{"now":"…"}`). Only the
286 // `model`/`assistant` text is the user-facing answer;
287 // tool-role text is intermediate data, not the reply.
288 let is_assistant = matches!(msg.role.as_str(), "model" | "assistant");
289 let Some(content_block) = msg.content.into_option() else {
290 continue;
291 };
292 match content_block.r#type {
293 Some(content::Type::Text(t)) if is_assistant && !t.text.is_empty() => {
294 text_parts.push(t.text);
295 }
296 // Non-assistant text (tool result echo, user) is not
297 // the reply — drop it entirely (not even fallback).
298 Some(content::Type::Text(_)) => {}
299 other => {
300 if let Some(rendered) = render_content(other) {
301 scaffolding.push(rendered);
302 }
303 }
304 }
305 }
306 }
307 // An explicit AgentEnd ends the turn. The stream closing
308 // without one is an implicit end-of-turn; we still return
309 // whatever was aggregated.
310 Some(agent_response::Type::End(_)) => break,
311 None => {
312 // Empty envelope — defensive; the control plane shouldn't
313 // send these but the wire allows it. Ignore.
314 }
315 }
316 }
317 let reply = if text_parts.is_empty() {
318 scaffolding.join("\n")
319 } else {
320 text_parts.join("\n")
321 };
322 Ok(reply)
323 }
324
325 /// Run one turn against the control plane and stream each meaningful step
326 /// as a [`TurnEvent`], for live surfaces that update in place.
327 ///
328 /// The request is built identically to [`AgentDialer::run_turn`]; see that
329 /// method for the meaning of `conversation_id`, `exec_id`, and
330 /// `user_text`. The returned stream yields:
331 ///
332 /// - [`TurnEvent::TextDelta`] for each model/assistant-role text block,
333 /// - [`TurnEvent::ToolStarted`] when a tool call begins, and
334 /// - [`TurnEvent::Done`] once at end-of-turn, after which the stream ends.
335 ///
336 /// Tool-role result echoes and empty/non-textual blocks produce no event.
337 ///
338 /// # Errors
339 ///
340 /// The outer `Result` carries a [`DialError::Connect`] if opening the
341 /// stream fails. Each item is a `Result` so per-message transport/decode
342 /// errors surface inline without tearing down the whole stream type.
343 pub async fn run_turn_streaming(
344 &self,
345 conversation_id: &str,
346 exec_id: &str,
347 user_text: &str,
348 ) -> Result<impl Stream<Item = Result<TurnEvent, DialError>>, DialError> {
349 self.run_turn_streaming_messages(
350 conversation_id,
351 exec_id,
352 vec![text_message("user", user_text)],
353 )
354 .await
355 }
356
357 /// Like [`Self::run_turn_streaming`] but takes a pre-built (e.g. attributed
358 /// multi-party) message list as the turn input.
359 ///
360 /// # Errors
361 ///
362 /// Returns [`DialError::Connect`] for any transport/stream/encoding error.
363 pub async fn run_turn_streaming_messages(
364 &self,
365 conversation_id: &str,
366 exec_id: &str,
367 messages: Vec<Message>,
368 ) -> Result<impl Stream<Item = Result<TurnEvent, DialError>>, DialError> {
369 self.run_turn_streaming_messages_with(
370 conversation_id,
371 exec_id,
372 messages,
373 None,
374 Attribution::default(),
375 )
376 .await
377 }
378
379 /// The full-fidelity variant: one method carries everything a turn's
380 /// request can — a settled inbound [`PaymentReceipt`] (the control plane
381 /// persists a signed `payment_receipt` event in the turn's atomic batch)
382 /// and the caller [`Attribution`] (resolved to durable personas and
383 /// recorded as `caller`/`participant` events). One method rather than a
384 /// matrix of variants, so a paid *and* attributed edge can't silently
385 /// drop one of the two.
386 ///
387 /// # Errors
388 ///
389 /// Returns [`DialError::Connect`] for any transport/stream/encoding error.
390 pub async fn run_turn_streaming_messages_with(
391 &self,
392 conversation_id: &str,
393 exec_id: &str,
394 messages: Vec<Message>,
395 payment_receipt: Option<PaymentReceipt>,
396 attribution: Attribution,
397 ) -> Result<impl Stream<Item = Result<TurnEvent, DialError>>, DialError> {
398 let request = build_request(
399 conversation_id,
400 exec_id,
401 messages,
402 payment_receipt,
403 attribution,
404 );
405 self.run_turn_streaming_request(request).await
406 }
407
408 /// Open the connect stream for one pre-built request and project the
409 /// response envelopes into [`TurnEvent`]s — the single transport body
410 /// every streaming variant shares.
411 async fn run_turn_streaming_request(
412 &self,
413 request: AgentRequest,
414 ) -> Result<impl Stream<Item = Result<TurnEvent, DialError>>, DialError> {
415 let mut stream = self
416 .client
417 .connect_with_options(request, traced_options())
418 .await?;
419 Ok(async_stream::try_stream! {
420 while let Some(view) = stream.message().await? {
421 let response = view.to_owned_message();
422 match response.r#type {
423 Some(agent_response::Type::Outputs(outputs)) => {
424 for msg in outputs.messages {
425 if let Some(event) = message_to_event(msg) {
426 yield event;
427 }
428 }
429 }
430 Some(agent_response::Type::End(end)) => {
431 // The terminal envelope's extensions (pending approvals,
432 // a sub-agent handoff) surface before the terminal Done
433 // so a live surface can prompt / show "delegating…".
434 for event in events_from_end(*end) {
435 yield event;
436 }
437 return;
438 }
439 // Empty envelope — defensive; ignore (mirrors `run_turn`).
440 None => {}
441 }
442 }
443 // Stream closed without an explicit AgentEnd: still signal a
444 // terminal event so the caller can finalise the live surface.
445 yield TurnEvent::Done;
446 })
447 }
448
449 /// Ask the control plane's participation gate whether to reply to the
450 /// latest message of a (multi-party) thread. Returns `true` only on
451 /// `respond`; `notify` / `ignore` map to `false` (stay silent). The gate
452 /// runs a cheap classifier model server-side and never runs a turn.
453 ///
454 /// # Errors
455 ///
456 /// Returns [`DialError::Connect`] for any transport/encoding error.
457 pub async fn should_respond(
458 &self,
459 conversation_id: &str,
460 bot_name: &str,
461 transcript: Vec<ParticipantMessage>,
462 ) -> Result<bool, DialError> {
463 let request = ClassifyRequest {
464 conversation_id: conversation_id.to_owned(),
465 bot_name: bot_name.to_owned(),
466 transcript,
467 ..Default::default()
468 };
469 let resp = self
470 .client
471 .classify_with_options(request, traced_options())
472 .await?
473 .into_owned();
474 Ok(resp.verdict.to_i32() == Verdict::VERDICT_RESPOND as i32)
475 }
476}
477
478/// The persisted outcome of an `ApprovalService.Respond` call.
479///
480/// On the THIN path the control plane is the signer: the caller submits an
481/// UNSIGNED decision and the control plane appends a server-signed
482/// `approval_response` event, returning the signature so the caller can show /
483/// audit a verifiable outcome.
484#[derive(Debug, Clone, PartialEq, Eq)]
485pub struct ApprovalOutcome {
486 /// `true` if a matching pending request existed and the response was
487 /// persisted; `false` is the idempotent no-op (already answered / unknown).
488 pub persisted: bool,
489 /// Lowercase-hex ed25519 signature over the canonical response bytes.
490 pub signature_hex: String,
491 /// Lowercase-hex public key the signature verifies against.
492 pub signed_by_hex: String,
493}
494
495/// Reusable handle for the control plane's `ApprovalService`.
496///
497/// The THIN human-in-the-loop path. Shares the `AgentService` endpoint — both
498/// are served on one Connect port — so it is built from the same address.
499#[derive(Clone)]
500pub struct ApprovalDialer {
501 client: Arc<ApprovalServiceClient<HttpClient>>,
502}
503
504impl ApprovalDialer {
505 /// Build a dialer pointed at `addr` (expects `http://host:port`).
506 ///
507 /// # Errors
508 /// Returns [`DialError::InvalidAddress`] if `addr` isn't a valid URI.
509 pub fn new(addr: &str) -> Result<Self, DialError> {
510 let uri = addr
511 .parse::<http::Uri>()
512 .map_err(|source| DialError::InvalidAddress {
513 addr: addr.to_owned(),
514 source,
515 })?;
516 let http = http_client_for(&uri)?;
517 let client = ApprovalServiceClient::new(
518 http,
519 ClientConfig::new(uri).with_default_timeout(CONTROL_DIAL_TIMEOUT),
520 );
521 Ok(Self {
522 client: Arc::new(client),
523 })
524 }
525
526 /// Submit a human's decision for a pending approval. The decision is
527 /// UNSIGNED — the control plane signs and persists it (THIN path) and
528 /// returns the signature. Idempotent: answering an already-decided or
529 /// unknown `request_id` returns `persisted: false`.
530 ///
531 /// # Errors
532 /// Returns [`DialError::Connect`] for any transport/encoding error.
533 pub async fn respond(
534 &self,
535 request_id: &str,
536 approved: bool,
537 reason: &str,
538 conversation_id: &str,
539 ) -> Result<ApprovalOutcome, DialError> {
540 let request = ApprovalResponseRequest {
541 request_id: request_id.to_owned(),
542 approved,
543 reason: reason.to_owned(),
544 conversation_id: conversation_id.to_owned(),
545 ..Default::default()
546 };
547 let reply = self
548 .client
549 .respond_with_options(request, traced_options())
550 .await?
551 .into_owned();
552 Ok(ApprovalOutcome {
553 persisted: reply.persisted,
554 signature_hex: reply.signature_hex,
555 signed_by_hex: reply.signed_by_hex,
556 })
557 }
558}
559
560/// A freshly minted link-ceremony challenge: the code an edge must deliver
561/// privately to the requesting user, plus its absolute expiry.
562#[derive(Debug, Clone, PartialEq, Eq)]
563pub struct StartedLink {
564 /// The single-use 6-digit code (a credential — never log or display it
565 /// outside a private channel the user owns).
566 pub code: String,
567 /// Unix-ms after which the code is refused.
568 pub expires_at_ms: u64,
569 /// The persona the code is bound to.
570 pub persona_id: String,
571}
572
573/// A freshly minted deep-link token: the opaque value an edge embeds in a
574/// platform URL for a no-typing ceremony, plus its absolute expiry.
575#[derive(Debug, Clone, PartialEq, Eq)]
576pub struct StartedDeepLink {
577 /// The single-use, high-entropy token (a credential — only ever placed in
578 /// a deep-link URL delivered to a private channel the user owns).
579 pub token: String,
580 /// Unix-ms after which the token is refused.
581 pub expires_at_ms: u64,
582 /// The persona the token is bound to.
583 pub persona_id: String,
584}
585
586/// The outcome of completing a link ceremony, in edge-renderable terms.
587///
588/// `InvalidCode` and `Expired` are deliberately fused into one variant: the
589/// proto contract is that edges present them identically so a guesser cannot
590/// learn whether a code was ever valid. The distinction stays in the control
591/// plane's logs.
592#[derive(Debug, Clone, PartialEq, Eq)]
593pub enum LinkCeremony {
594 /// The identity is now bound to the persona (directly or by merge).
595 Linked {
596 /// The surviving persona id.
597 persona_id: String,
598 },
599 /// The identity already resolved to that persona; the code was consumed.
600 AlreadyLinked {
601 /// The persona id.
602 persona_id: String,
603 },
604 /// No usable code (never minted, already consumed, or expired). Rendered
605 /// identically to the user.
606 InvalidOrExpired,
607 /// The completing identity is locked out after repeated failures (or the
608 /// deployment-wide backstop tripped). Rendered distinctly: "wait".
609 Throttled,
610 /// An unexpected/unspecified outcome — the edge should surface a generic
611 /// failure rather than claim success.
612 Failed,
613}
614
615impl LinkCeremony {
616 /// The user-facing message for this outcome, shared by every edge.
617 ///
618 /// `InvalidCode` and `Expired` are fused into one variant upstream (so no
619 /// edge can distinguish them — no oracle for a guesser); centralizing the
620 /// rendering here keeps that security-relevant wording identical across
621 /// surfaces. Channel-neutral phrasing; an edge that needs surface-specific
622 /// text can still match the variant itself.
623 #[must_use]
624 pub const fn user_message(&self) -> &'static str {
625 match self {
626 Self::Linked { .. } => {
627 "✅ Linked — this account now shares one Polychrome persona with your other channels."
628 }
629 Self::AlreadyLinked { .. } => {
630 "✅ Already linked — this account was already on that persona."
631 }
632 Self::InvalidOrExpired => {
633 "That link is invalid or expired. Start a fresh one from your other channel and try again."
634 }
635 Self::Throttled => "Too many attempts. Please wait a few minutes and try again.",
636 Self::Failed => "Sorry — something went wrong completing that link. Please try again.",
637 }
638 }
639}
640
641/// Reusable handle for the control plane's `PersonaService`.
642///
643/// The verified identity-linking ceremonies. Shares the `AgentService`
644/// endpoint — all three services are served on one internal Connect port —
645/// so it is built from the same address.
646#[derive(Clone)]
647pub struct PersonaDialer {
648 client: Arc<PersonaServiceClient<HttpClient>>,
649}
650
651impl PersonaDialer {
652 /// Build a dialer pointed at `addr` (expects `http://host:port`).
653 ///
654 /// # Errors
655 /// Returns [`DialError::InvalidAddress`] if `addr` isn't a valid URI.
656 pub fn new(addr: &str) -> Result<Self, DialError> {
657 let uri = addr
658 .parse::<http::Uri>()
659 .map_err(|source| DialError::InvalidAddress {
660 addr: addr.to_owned(),
661 source,
662 })?;
663 let http = http_client_for(&uri)?;
664 let client = PersonaServiceClient::new(
665 http,
666 ClientConfig::new(uri).with_default_timeout(CONTROL_DIAL_TIMEOUT),
667 );
668 Ok(Self {
669 client: Arc::new(client),
670 })
671 }
672
673 /// Mint a single-use link-ceremony code bound to `identity`'s persona
674 /// (provisioning one on first contact). The returned [`StartedLink::code`]
675 /// is a credential the caller must deliver only to a private channel the
676 /// user owns.
677 ///
678 /// # Errors
679 /// Returns [`DialError::Connect`] for any transport/encoding error.
680 pub async fn start_link(&self, identity: ExternalIdentity) -> Result<StartedLink, DialError> {
681 let request = StartLinkRequest {
682 identity: buffa::MessageField::some(identity),
683 ..Default::default()
684 };
685 let reply = self
686 .client
687 .start_link_with_options(request, traced_options())
688 .await?
689 .into_owned();
690 Ok(StartedLink {
691 code: reply.code,
692 expires_at_ms: reply.expires_at_ms,
693 persona_id: reply.persona_id,
694 })
695 }
696
697 /// Mint a high-entropy deep-link token bound to `identity`'s persona, for
698 /// a no-typing ceremony (the caller embeds [`StartedDeepLink::token`] in a
699 /// platform URL like `https://t.me/<bot>?start=<token>`). The token is a
700 /// credential — deliver the URL only to a private channel the user owns.
701 /// Completed via [`Self::complete_link`] from the target platform.
702 ///
703 /// # Errors
704 /// Returns [`DialError::Connect`] for any transport/encoding error.
705 pub async fn start_deeplink(
706 &self,
707 identity: ExternalIdentity,
708 ) -> Result<StartedDeepLink, DialError> {
709 let request = StartDeepLinkRequest {
710 identity: buffa::MessageField::some(identity),
711 ..Default::default()
712 };
713 let reply = self
714 .client
715 .start_deep_link_with_options(request, traced_options())
716 .await?
717 .into_owned();
718 Ok(StartedDeepLink {
719 token: reply.token,
720 expires_at_ms: reply.expires_at_ms,
721 persona_id: reply.persona_id,
722 })
723 }
724
725 /// Consume `code` from the channel being claimed, binding `identity` to
726 /// the minting persona (merging `identity`'s prior persona if it had one).
727 ///
728 /// # Errors
729 /// Returns [`DialError::Connect`] for any transport/encoding error.
730 pub async fn complete_link(
731 &self,
732 code: &str,
733 identity: ExternalIdentity,
734 ) -> Result<LinkCeremony, DialError> {
735 let request = CompleteLinkRequest {
736 code: code.to_owned(),
737 identity: buffa::MessageField::some(identity),
738 ..Default::default()
739 };
740 let reply = self
741 .client
742 .complete_link_with_options(request, traced_options())
743 .await?
744 .into_owned();
745 Ok(match reply.outcome.as_known() {
746 Some(LinkOutcome::Linked) => LinkCeremony::Linked {
747 persona_id: reply.persona_id,
748 },
749 Some(LinkOutcome::AlreadyLinked) => LinkCeremony::AlreadyLinked {
750 persona_id: reply.persona_id,
751 },
752 // Fused on purpose — see `LinkCeremony::InvalidOrExpired`.
753 Some(LinkOutcome::InvalidCode | LinkOutcome::Expired) => LinkCeremony::InvalidOrExpired,
754 Some(LinkOutcome::Throttled) => LinkCeremony::Throttled,
755 Some(LinkOutcome::Unspecified) | None => LinkCeremony::Failed,
756 })
757 }
758
759 /// Read-only: the profile the `identity` resolves to, as an edge-friendly
760 /// [`PersonaView`]. Returns an all-empty view for an identity not yet in
761 /// the directory; **never provisions** a persona.
762 ///
763 /// # Errors
764 /// Returns [`DialError::Connect`] for any transport/encoding error.
765 pub async fn describe(&self, identity: ExternalIdentity) -> Result<PersonaView, DialError> {
766 let request = DescribeRequest {
767 identity: buffa::MessageField::some(identity),
768 ..Default::default()
769 };
770 let reply = self
771 .client
772 .describe_with_options(request, traced_options())
773 .await?
774 .into_owned();
775 // Unknown identity → empty profile → an all-default view (the
776 // not-yet-claimed dashboard state).
777 let Some(profile) = reply.profile.into_option() else {
778 return Ok(PersonaView {
779 persona_id: reply.persona_id,
780 ..Default::default()
781 });
782 };
783 Ok(PersonaView {
784 persona_id: reply.persona_id,
785 status: profile.status,
786 identities: profile
787 .identities
788 .into_iter()
789 .map(|id| LinkedIdentity {
790 provider: id.provider,
791 external_id: id.external_id,
792 display_name: id.display_name,
793 })
794 .collect(),
795 })
796 }
797}
798
799/// One external identity linked to a persona, as the dashboard renders it
800/// (provider · display name · id).
801#[derive(Debug, Clone, PartialEq, Eq)]
802pub struct LinkedIdentity {
803 /// Edge namespace ("slack", "telegram", …).
804 pub provider: String,
805 /// The provider-native stable id.
806 pub external_id: String,
807 /// Display name as last observed (presentation only).
808 pub display_name: String,
809}
810
811/// An edge-friendly read of a persona, for surfaces like the App Home
812/// dashboard. Every field is empty when the queried identity is not in the
813/// directory (a not-yet-claimed user).
814#[derive(Debug, Clone, PartialEq, Eq, Default)]
815pub struct PersonaView {
816 /// The persona id; empty when the identity is unknown.
817 pub persona_id: String,
818 /// "provisional" | "linked" | "merged"; empty when unknown.
819 pub status: String,
820 /// Every identity currently linked to the persona.
821 pub identities: Vec<LinkedIdentity>,
822}
823
824/// A turn-input message, re-exported so callers can build attributed
825/// multi-party input for [`AgentDialer::run_turn_streaming_messages`] without
826/// depending on `polyc-proto`.
827pub use polyc_proto::proto::polychrome::agent::v1::Message as TurnMessage;
828/// The attributed transcript line type the participation gate consumes.
829/// Re-exported so callers build requests without importing `polyc-proto`.
830pub use polyc_proto::proto::polychrome::agent::v1::ParticipantMessage as GateMessage;
831/// A settled inbound payment receipt, re-exported so the public-edge layer can
832/// thread it into [`AgentDialer::run_turn_streaming_messages_with`]
833/// without depending on `polyc-proto` directly.
834pub use polyc_proto::proto::polychrome::agent::v1::PaymentReceipt;
835
836/// The external identity of a human observed at an edge (re-exported wire
837/// type, shared with the persona store records): the per-edge `EdgeAdapter`
838/// mapping produces these and the control plane resolves them to durable
839/// personas.
840pub use polyc_proto::proto::polychrome::persona::v1::ExternalIdentity;
841
842/// Caller attribution for one turn.
843///
844/// Carries the identity whose message triggered the turn (`caller`) and any
845/// other humans whose messages entered the turn's input (`participants`).
846/// Edges without caller identity pass the default.
847#[derive(Debug, Clone, Default)]
848pub struct Attribution {
849 /// The identity whose message triggered the turn.
850 pub caller: Option<ExternalIdentity>,
851 /// Other humans whose messages entered this turn's input.
852 pub participants: Vec<ExternalIdentity>,
853}
854
855/// Build an attributed user-role turn message rendered as `"<speaker>: text"`,
856/// so the model sees who said what in a multi-party thread.
857#[must_use]
858pub fn attributed_message(speaker: &str, text: &str) -> TurnMessage {
859 text_message("user", &format!("{speaker}: {text}"))
860}
861
862/// Build a plain user-role turn message (no speaker attribution) — used for
863/// 1:1 DMs where there is only one human in the conversation.
864#[must_use]
865pub fn user_message(text: &str) -> TurnMessage {
866 text_message("user", text)
867}
868
869/// Build the documented **readable** namespaced conversation id,
870/// `"{namespace}:{native_id}"`.
871///
872/// Every edge derives [`AgentRequest::conversation_id`] from its native unit
873/// (a thread, a ticket, an issue, a session). The convention is a namespace
874/// prefix that names the edge family, followed by the edge's own stable handle
875/// for the conversation — e.g. `github:owner/repo#42`, `web:{session-uuid}`,
876/// `mcp:{caller-id}`. The id is opaque to the orchestration core (it is the
877/// event-log partition key, `conv-{id}`); only the *format* is a shared
878/// convention so edges stay consistent and ids are greppable.
879///
880/// Edges whose native coordinate is unwieldy or sensitive — many chat
881/// platforms — should instead use [`hashed_conversation_id`], which collapses
882/// the coordinate into a fixed-length opaque `UUIDv5` under a pinned namespace.
883#[must_use]
884pub fn namespaced_id(namespace: &str, native_id: &str) -> String {
885 format!("{namespace}:{native_id}")
886}
887
888/// Derive a stable, fixed-length conversation id by hashing `parts` into a
889/// `UUIDv5` under a pinned `namespace`.
890///
891/// This is the "opaque hash" namespacing policy: a deterministic,
892/// stateless-across-restarts id (no `native → id` table to keep) that is
893/// globally unique enough to key the event-log partition. `parts` are joined
894/// with `':'` before hashing, so a caller passing `["T1", "C1", "169…"]`
895/// hashes exactly `"T1:C1:169…"`.
896///
897/// The pinned `namespace` UUID **must never change** for a given edge — every
898/// existing conversation id for that edge depends on it. `polychrome-slack`
899/// uses this policy (its `UUIDv5` over `(team, channel, thread_ts)`); new edges
900/// pick their own frozen namespace.
901#[must_use]
902pub fn hashed_conversation_id(namespace: uuid::Uuid, parts: &[&str]) -> String {
903 let joined = parts.join(":");
904 uuid::Uuid::new_v5(&namespace, joined.as_bytes())
905 .hyphenated()
906 .to_string()
907}
908
909/// Collision-free variant of [`hashed_conversation_id`] for edges whose native
910/// parts can themselves contain `':'` (e.g. an email `Message-ID`, a URL, a
911/// path).
912///
913/// [`hashed_conversation_id`] joins parts with a bare `':'`, so `["a:b", "c"]`
914/// and `["a", "b:c"]` both hash `"a:b:c"` and collide. This helper instead
915/// **length-prefixes** each part (`"{len}:{part}"`), which is unambiguous
916/// regardless of any `':'` inside a part — the length says exactly how many
917/// bytes the part occupies, so distinct part boundaries always produce distinct
918/// hash inputs. Use it for any edge whose coordinate fields are not guaranteed
919/// `':'`-free.
920///
921/// Distinct from [`hashed_conversation_id`] precisely because the framing
922/// differs; the two do not agree for the same `parts`.
923#[must_use]
924pub fn framed_conversation_id(namespace: uuid::Uuid, parts: &[&str]) -> String {
925 let mut framed = String::new();
926 for p in parts {
927 framed.push_str(&p.len().to_string());
928 framed.push(':');
929 framed.push_str(p);
930 }
931 uuid::Uuid::new_v5(&namespace, framed.as_bytes())
932 .hyphenated()
933 .to_string()
934}
935
936/// Build the single [`AgentRequest`] that opens an `AgentService.connect`
937/// stream. Shared by [`AgentDialer::run_turn`] and
938/// [`AgentDialer::run_turn_streaming`] so the two paths can't drift.
939fn build_request(
940 conversation_id: &str,
941 exec_id: &str,
942 messages: Vec<Message>,
943 payment_receipt: Option<PaymentReceipt>,
944 attribution: Attribution,
945) -> AgentRequest {
946 AgentRequest {
947 conversation_id: conversation_id.to_owned(),
948 exec_id: exec_id.to_owned(),
949 start: buffa::MessageField::some(AgentStart {
950 agent_id: String::new(),
951 agent_config: Vec::new(),
952 messages,
953 payment_receipt: payment_receipt
954 .map_or_else(buffa::MessageField::none, buffa::MessageField::some),
955 caller: attribution
956 .caller
957 .map_or_else(buffa::MessageField::none, buffa::MessageField::some),
958 participants: attribution.participants,
959 ..Default::default()
960 }),
961 ..Default::default()
962 }
963}
964
965/// Project a terminal [`AgentEnd`] into the [`TurnEvent`]s a live surface sees
966/// at end-of-turn: one [`TurnEvent::ApprovalPending`] per paused tool call,
967/// then a [`TurnEvent::HandoffStarted`] if the turn delegated to a sub-agent,
968/// then the terminal [`TurnEvent::Done`].
969///
970/// Pure (no I/O) so the End-arm mapping can be unit-tested without a live
971/// stream, mirroring [`message_to_event`] for the Outputs arm.
972fn events_from_end(end: AgentEnd) -> Vec<TurnEvent> {
973 let mut events: Vec<TurnEvent> = end
974 .pending_approvals
975 .into_iter()
976 .map(|pa| TurnEvent::ApprovalPending {
977 request_id: pa.request_id,
978 tool_name: pa.tool_name,
979 title: pa.title,
980 args_json: pa.args_json,
981 })
982 .collect();
983 if let Some(h) = end.handoff.into_option() {
984 events.push(TurnEvent::HandoffStarted {
985 child_agent_id: h.child_agent_id,
986 reason: h.reason,
987 });
988 }
989 events.push(TurnEvent::Done);
990 events
991}
992
993/// Map one output [`Message`] to an optional [`TurnEvent`] for the streaming
994/// path.
995///
996/// Pure (no I/O) so the mapping can be unit-tested without a live stream:
997///
998/// - A tool-call content block, or any tool-role message, becomes a
999/// [`TurnEvent::ToolStarted`] named for the called function (falling back to
1000/// the call id when the function name is absent).
1001/// - A non-empty text block on a model/assistant-role message becomes a
1002/// [`TurnEvent::TextDelta`].
1003/// - Tool-role text (a tool-result echo) and empty/non-textual blocks produce
1004/// no event.
1005fn message_to_event(msg: Message) -> Option<TurnEvent> {
1006 let content_block = msg.content.into_option()?;
1007 match content_block.r#type? {
1008 content::Type::ToolCall(tc) => {
1009 let name = match tc.r#type.as_ref() {
1010 Some(tool_call_content::Type::FunctionCall(fc)) if !fc.name.is_empty() => {
1011 fc.name.clone()
1012 }
1013 _ => tc.id.clone(),
1014 };
1015 Some(TurnEvent::ToolStarted { name })
1016 }
1017 content::Type::Text(t) => {
1018 if is_assistant_role(&msg.role) && !t.text.is_empty() {
1019 Some(TurnEvent::TextDelta(t.text))
1020 } else {
1021 // Tool-role text is an intermediate result echo; empty text and
1022 // non-assistant roles carry nothing the user should see.
1023 None
1024 }
1025 }
1026 // Thoughts, tool results, and media variants have no streaming event in
1027 // this version.
1028 _ => None,
1029 }
1030}
1031
1032/// Whether a message role denotes the assistant's own answer text.
1033///
1034/// The control plane uses `model` (provider-native) and `assistant`
1035/// interchangeably for the agent's replies; both count.
1036fn is_assistant_role(role: &str) -> bool {
1037 role == "model" || role == "assistant"
1038}
1039
1040/// Render one [`content::Type`] variant as user-visible text.
1041///
1042/// Returns `None` for variants that have no useful textual representation
1043/// (image/audio/document/video/confirmation) so the `join("\n")` in
1044/// [`AgentDialer::run_turn`] doesn't emit stray blank lines.
1045///
1046/// Tool calls are rendered (rather than skipped) so a turn that ends on a
1047/// `tool_call` with no follow-up text — e.g. a HITL pause or approval-only
1048/// confirmation flow — gives the user something to see instead of an empty
1049/// body.
1050fn render_content(ty: Option<content::Type>) -> Option<String> {
1051 match ty? {
1052 content::Type::Text(t) => {
1053 if t.text.is_empty() {
1054 None
1055 } else {
1056 Some(t.text)
1057 }
1058 }
1059 content::Type::ToolCall(tc) => {
1060 let name = match tc.r#type.as_ref() {
1061 Some(tool_call_content::Type::FunctionCall(fc)) => fc.name.as_str(),
1062 None => "",
1063 };
1064 if name.is_empty() {
1065 Some(format!("[tool_call:{}]", tc.id))
1066 } else {
1067 Some(format!("[tool_call:{name} {}]", tc.id))
1068 }
1069 }
1070 content::Type::ToolResult(tr) => Some(format!("[tool_result:{}]", tr.call_id)),
1071 content::Type::Thought(t) => {
1072 // ThoughtContent has no top-level text; its `summary` repeated
1073 // field carries the user-visible reasoning when the provider
1074 // emits one. Collect any TextContent summaries; skip otherwise.
1075 let mut buf = String::new();
1076 for s in t.summary {
1077 if let Some(thought_summary_content::Type::Text(text)) = s.r#type
1078 && !text.text.is_empty()
1079 {
1080 if !buf.is_empty() {
1081 buf.push(' ');
1082 }
1083 buf.push_str(&text.text);
1084 }
1085 }
1086 if buf.is_empty() {
1087 None
1088 } else {
1089 Some(format!("[thought] {buf}"))
1090 }
1091 }
1092 // image / audio / document / video / confirmation — no useful text
1093 // rendering for v1.
1094 _ => None,
1095 }
1096}
1097
1098#[cfg(test)]
1099mod tests {
1100 #![allow(clippy::pedantic, clippy::nursery, missing_docs)]
1101 use super::*;
1102 use polyc_proto::proto::polychrome::agent::v1::{
1103 Content, FunctionCallContent, TextContent, ThoughtContent, ThoughtSummaryContent,
1104 ToolCallContent, ToolResultContent,
1105 };
1106
1107 #[test]
1108 fn dial_error_retryable_classification() {
1109 use connectrpc::ErrorCode;
1110 // Transient transport conditions → retryable (ask for redelivery).
1111 for code in [
1112 ErrorCode::Unavailable,
1113 ErrorCode::DeadlineExceeded,
1114 ErrorCode::ResourceExhausted,
1115 ErrorCode::Aborted,
1116 ] {
1117 let err = DialError::Connect(connectrpc::ConnectError::new(code, "x"));
1118 assert_eq!(err.code(), Some(code));
1119 assert!(err.is_retryable(), "{code:?} should be retryable");
1120 }
1121 // Terminal codes → NOT retryable (redelivery would loop forever).
1122 for code in [
1123 ErrorCode::InvalidArgument,
1124 ErrorCode::Unauthenticated,
1125 ErrorCode::NotFound,
1126 ErrorCode::PermissionDenied,
1127 ErrorCode::Internal,
1128 ] {
1129 let err = DialError::Connect(connectrpc::ConnectError::new(code, "x"));
1130 assert_eq!(err.code(), Some(code));
1131 assert!(!err.is_retryable(), "{code:?} should not be retryable");
1132 }
1133 // Local failures carry no Connect code and are never retryable.
1134 let bad_addr = DialError::InvalidAddress {
1135 addr: "http://a b".to_owned(),
1136 source: "http://a b".parse::<http::Uri>().unwrap_err(),
1137 };
1138 assert_eq!(bad_addr.code(), None);
1139 assert!(!bad_addr.is_retryable());
1140 let tls = DialError::Tls("no provider".to_owned());
1141 assert_eq!(tls.code(), None);
1142 assert!(!tls.is_retryable());
1143 }
1144
1145 fn text(s: &str) -> Option<content::Type> {
1146 Some(content::Type::Text(Box::new(TextContent {
1147 text: s.to_owned(),
1148 ..Default::default()
1149 })))
1150 }
1151
1152 fn tool_call(id: &str, name: &str) -> Option<content::Type> {
1153 Some(content::Type::ToolCall(Box::new(ToolCallContent {
1154 id: id.to_owned(),
1155 r#type: Some(tool_call_content::Type::FunctionCall(Box::new(
1156 FunctionCallContent {
1157 name: name.to_owned(),
1158 ..Default::default()
1159 },
1160 ))),
1161 ..Default::default()
1162 })))
1163 }
1164
1165 fn tool_result(call_id: &str) -> Option<content::Type> {
1166 Some(content::Type::ToolResult(Box::new(ToolResultContent {
1167 call_id: call_id.to_owned(),
1168 ..Default::default()
1169 })))
1170 }
1171
1172 fn thought(summary: &str) -> Option<content::Type> {
1173 Some(content::Type::Thought(Box::new(ThoughtContent {
1174 summary: vec![ThoughtSummaryContent {
1175 r#type: Some(thought_summary_content::Type::Text(Box::new(TextContent {
1176 text: summary.to_owned(),
1177 ..Default::default()
1178 }))),
1179 ..Default::default()
1180 }],
1181 ..Default::default()
1182 })))
1183 }
1184
1185 #[test]
1186 fn renders_text_verbatim() {
1187 assert_eq!(render_content(text("hi")), Some("hi".to_owned()));
1188 }
1189
1190 #[test]
1191 fn renders_tool_call_with_name_and_id() {
1192 assert_eq!(
1193 render_content(tool_call("call_42", "search")),
1194 Some("[tool_call:search call_42]".to_owned())
1195 );
1196 }
1197
1198 #[test]
1199 fn renders_tool_call_without_function_name() {
1200 assert_eq!(
1201 render_content(Some(content::Type::ToolCall(Box::new(ToolCallContent {
1202 id: "call_bare".to_owned(),
1203 r#type: None,
1204 ..Default::default()
1205 })))),
1206 Some("[tool_call:call_bare]".to_owned())
1207 );
1208 }
1209
1210 #[test]
1211 fn renders_tool_result_by_call_id() {
1212 assert_eq!(
1213 render_content(tool_result("call_42")),
1214 Some("[tool_result:call_42]".to_owned())
1215 );
1216 }
1217
1218 #[test]
1219 fn renders_thought_summary() {
1220 assert_eq!(
1221 render_content(thought("considering options")),
1222 Some("[thought] considering options".to_owned())
1223 );
1224 }
1225
1226 #[test]
1227 fn empty_text_skipped() {
1228 assert_eq!(render_content(text("")), None);
1229 }
1230
1231 #[test]
1232 fn unknown_variant_skipped() {
1233 // No content::Type set at all.
1234 assert_eq!(render_content(None), None);
1235 }
1236
1237 // Helper used by aggregation tests: feed the same rendering loop
1238 // run_turn uses, but driven from a vector of fake content blocks rather
1239 // than a live connectrpc stream.
1240 fn aggregate(blocks: Vec<Option<content::Type>>) -> String {
1241 let mut parts: Vec<String> = Vec::new();
1242 for b in blocks {
1243 if let Some(s) = render_content(b) {
1244 parts.push(s);
1245 }
1246 }
1247 parts.join("\n")
1248 }
1249
1250 #[test]
1251 fn aggregate_pure_text_turn() {
1252 assert_eq!(
1253 aggregate(vec![text("hello"), text("world")]),
1254 "hello\nworld"
1255 );
1256 }
1257
1258 #[test]
1259 fn aggregate_tool_call_only_turn() {
1260 // A turn that ends in a tool call with no follow-up text used to
1261 // come back as "" and was silently dropped by the Slack handler.
1262 // Rendering a placeholder keeps the user informed.
1263 assert_eq!(
1264 aggregate(vec![tool_call("call_1", "lookup")]),
1265 "[tool_call:lookup call_1]"
1266 );
1267 }
1268
1269 #[test]
1270 fn aggregate_mixed_text_and_tool_call() {
1271 assert_eq!(
1272 aggregate(vec![text("thinking..."), tool_call("call_1", "search")]),
1273 "thinking...\n[tool_call:search call_1]"
1274 );
1275 }
1276
1277 // --- streaming mapping (message_to_event) ---
1278
1279 fn message(role: &str, ty: Option<content::Type>) -> Message {
1280 Message {
1281 role: role.to_owned(),
1282 content: buffa::MessageField::some(Content {
1283 r#type: ty,
1284 ..Default::default()
1285 }),
1286 ..Default::default()
1287 }
1288 }
1289
1290 #[test]
1291 fn assistant_text_becomes_text_delta() {
1292 assert_eq!(
1293 message_to_event(message("assistant", text("hello"))),
1294 Some(TurnEvent::TextDelta("hello".to_owned()))
1295 );
1296 }
1297
1298 #[test]
1299 fn model_role_also_counts_as_assistant() {
1300 assert_eq!(
1301 message_to_event(message("model", text("hi"))),
1302 Some(TurnEvent::TextDelta("hi".to_owned()))
1303 );
1304 }
1305
1306 #[test]
1307 fn tool_role_text_is_skipped() {
1308 // A tool-result echo carried as text must not surface as answer text.
1309 assert_eq!(message_to_event(message("tool", text("result blob"))), None);
1310 }
1311
1312 #[test]
1313 fn empty_assistant_text_is_skipped() {
1314 assert_eq!(message_to_event(message("assistant", text(""))), None);
1315 }
1316
1317 #[test]
1318 fn tool_call_becomes_tool_started_with_name() {
1319 assert_eq!(
1320 message_to_event(message("model", tool_call("call_7", "search"))),
1321 Some(TurnEvent::ToolStarted {
1322 name: "search".to_owned()
1323 })
1324 );
1325 }
1326
1327 #[test]
1328 fn tool_call_falls_back_to_call_id() {
1329 assert_eq!(
1330 message_to_event(message(
1331 "tool",
1332 Some(content::Type::ToolCall(Box::new(ToolCallContent {
1333 id: "call_bare".to_owned(),
1334 r#type: None,
1335 ..Default::default()
1336 })))
1337 )),
1338 Some(TurnEvent::ToolStarted {
1339 name: "call_bare".to_owned()
1340 })
1341 );
1342 }
1343
1344 #[test]
1345 fn tool_result_produces_no_event() {
1346 assert_eq!(
1347 message_to_event(message("tool", tool_result("call_7"))),
1348 None
1349 );
1350 }
1351
1352 // Drives the same per-message mapping the streaming loop uses, over a
1353 // synthetic turn, then appends Done as the End arm would.
1354 fn map_turn(msgs: Vec<Message>) -> Vec<TurnEvent> {
1355 let mut events: Vec<TurnEvent> = msgs.into_iter().filter_map(message_to_event).collect();
1356 events.push(TurnEvent::Done);
1357 events
1358 }
1359
1360 #[test]
1361 fn synthetic_turn_yields_expected_event_sequence() {
1362 // model Text delta, a tool-role ToolCall, another model Text, End.
1363 let turn = vec![
1364 message("model", text("Let me look that up.")),
1365 message("tool", tool_call("call_1", "search")),
1366 message("model", text("Found it.")),
1367 ];
1368 assert_eq!(
1369 map_turn(turn),
1370 vec![
1371 TurnEvent::TextDelta("Let me look that up.".to_owned()),
1372 TurnEvent::ToolStarted {
1373 name: "search".to_owned()
1374 },
1375 TurnEvent::TextDelta("Found it.".to_owned()),
1376 TurnEvent::Done,
1377 ]
1378 );
1379 }
1380
1381 // --- terminal-envelope projection (events_from_end) ---
1382
1383 use polyc_proto::proto::polychrome::agent::v1::{
1384 Handoff as WireHandoff, PendingApproval as WirePendingApproval,
1385 };
1386
1387 #[test]
1388 fn end_with_nothing_yields_only_done() {
1389 assert_eq!(events_from_end(AgentEnd::default()), vec![TurnEvent::Done]);
1390 }
1391
1392 #[test]
1393 fn end_with_handoff_yields_handoff_then_done() {
1394 let end = AgentEnd {
1395 handoff: buffa::MessageField::some(WireHandoff {
1396 call_id: "call_1".to_owned(),
1397 child_agent_id: "researcher".to_owned(),
1398 reason: "needs deep dive".to_owned(),
1399 ..Default::default()
1400 }),
1401 ..Default::default()
1402 };
1403 assert_eq!(
1404 events_from_end(end),
1405 vec![
1406 TurnEvent::HandoffStarted {
1407 child_agent_id: "researcher".to_owned(),
1408 reason: "needs deep dive".to_owned(),
1409 },
1410 TurnEvent::Done,
1411 ]
1412 );
1413 }
1414
1415 #[test]
1416 fn end_orders_approvals_before_handoff_before_done() {
1417 let end = AgentEnd {
1418 pending_approvals: vec![WirePendingApproval {
1419 request_id: "r1".to_owned(),
1420 tool_name: "delete_file".to_owned(),
1421 args_json: "{}".to_owned(),
1422 title: "Delete a file".to_owned(),
1423 ..Default::default()
1424 }],
1425 handoff: buffa::MessageField::some(WireHandoff {
1426 child_agent_id: "child".to_owned(),
1427 ..Default::default()
1428 }),
1429 ..Default::default()
1430 };
1431 assert_eq!(
1432 events_from_end(end),
1433 vec![
1434 TurnEvent::ApprovalPending {
1435 request_id: "r1".to_owned(),
1436 tool_name: "delete_file".to_owned(),
1437 title: "Delete a file".to_owned(),
1438 args_json: "{}".to_owned(),
1439 },
1440 TurnEvent::HandoffStarted {
1441 child_agent_id: "child".to_owned(),
1442 reason: String::new(),
1443 },
1444 TurnEvent::Done,
1445 ]
1446 );
1447 }
1448
1449 // --- namespaced conversation ids ---
1450
1451 #[test]
1452 fn namespaced_id_is_prefix_colon_native() {
1453 assert_eq!(
1454 namespaced_id("github", "owner/repo#42"),
1455 "github:owner/repo#42"
1456 );
1457 assert_eq!(namespaced_id("web", "abc-123"), "web:abc-123");
1458 }
1459
1460 #[test]
1461 fn hashed_conversation_id_is_deterministic_v5() {
1462 let ns = uuid::Uuid::from_u128(0x1234_5678_9abc_4def_8123_4567_89ab_cdef);
1463 let a = hashed_conversation_id(ns, &["T1", "C1", "169.000"]);
1464 let b = hashed_conversation_id(ns, &["T1", "C1", "169.000"]);
1465 assert_eq!(a, b);
1466 assert_ne!(a, hashed_conversation_id(ns, &["T1", "C2", "169.000"]));
1467 let parsed: uuid::Uuid = a.parse().unwrap();
1468 assert_eq!(parsed.get_version_num(), 5);
1469 }
1470
1471 #[test]
1472 fn framed_conversation_id_resists_separator_collisions() {
1473 let ns = uuid::Uuid::from_u128(0x99);
1474 // The exact case that collides under the bare-join helper.
1475 assert_ne!(
1476 framed_conversation_id(ns, &["a:b", "c"]),
1477 framed_conversation_id(ns, &["a", "b:c"]),
1478 );
1479 // Sanity: the bare-join helper DOES collide here (documents why framed exists).
1480 assert_eq!(
1481 hashed_conversation_id(ns, &["a:b", "c"]),
1482 hashed_conversation_id(ns, &["a", "b:c"]),
1483 );
1484 // Deterministic + valid v5.
1485 let id = framed_conversation_id(ns, &["mail", "<abc@x>"]);
1486 assert_eq!(id, framed_conversation_id(ns, &["mail", "<abc@x>"]));
1487 assert_eq!(id.parse::<uuid::Uuid>().unwrap().get_version_num(), 5);
1488 }
1489
1490 #[test]
1491 fn hashed_conversation_id_matches_slacks_inline_algorithm() {
1492 // Golden cross-check: the SDK helper must reproduce exactly what the
1493 // Slack edge used to compute inline (UUIDv5 of "team:channel:thread"
1494 // under a pinned namespace), so delegating in `polychrome-slack`
1495 // changes no existing conversation id.
1496 let ns = uuid::Uuid::from_u128(0xa1b2_c3d4_e5f6_4789_abcd_ef01_2345_6789);
1497 let parts = ["T01234ABCD", "C0B5V0JA401", "1700000000.000100"];
1498 let inline = uuid::Uuid::new_v5(&ns, parts.join(":").as_bytes())
1499 .hyphenated()
1500 .to_string();
1501 assert_eq!(hashed_conversation_id(ns, &parts), inline);
1502 }
1503
1504 #[test]
1505 fn link_outcome_messages_honor_presentation_rules() {
1506 // INVALID and EXPIRED are fused upstream into one variant, so neither
1507 // edge can distinguish them — one shared message, no oracle.
1508 assert!(
1509 LinkCeremony::InvalidOrExpired
1510 .user_message()
1511 .contains("invalid or expired")
1512 );
1513 // THROTTLED reads distinctly (wait, don't retry).
1514 assert!(LinkCeremony::Throttled.user_message().contains("wait"));
1515 assert!(
1516 LinkCeremony::Linked {
1517 persona_id: "p".to_owned()
1518 }
1519 .user_message()
1520 .contains("Linked")
1521 );
1522 }
1523}