Skip to main content

heddle_client/
presence.rs

1//! Local-agent presence publisher (Track B).
2//!
3//! Runs in the foreground: reads the configured hosted upstream + namespace
4//! from `.heddle/config.toml`, resolves a bearer token from the user config or
5//! `HEDDLE_REMOTE_TOKEN`, opens a WebSocket to `<upstream>/presence/ws`, and
6//! streams `agent_start` → periodic `agent_heartbeat` → `agent_done` events.
7//!
8//! This module is gated on the `client` feature because the WebSocket
9//! client (and therefore `tokio-tungstenite`) is only pulled in for hosted
10//! builds.
11//!
12//! # Scope
13//!
14//! - No daemonization, no PID stashing, no signal handling beyond Ctrl-C.
15//! - Orchestrators (e.g. Claude Code's hook system, or `heddle actor spawn`)
16//!   are expected to invoke this subcommand themselves; there is no
17//!   automatic launch on actor registration.
18//! - Tokens are fetched once at startup. Long-running agents should use
19//!   device-bound tokens (30-day TTL minted via `heddle auth login`). On
20//!   `Unauthorized` mid-stream the publisher exits cleanly and logs a
21//!   pointer at re-running `heddle auth login` — no in-band refresh.
22
23use std::{path::Path, sync::Arc, time::Duration};
24
25use anyhow::{Context, Result, anyhow};
26use cli_shared::UserConfig;
27use futures::{SinkExt, StreamExt};
28use objects::store::{AgentEntry, AgentRegistry};
29use repo::{HostedConfig, Repository};
30use serde::{Deserialize, Serialize};
31use tokio::{
32    select,
33    sync::Mutex,
34    time::{self, Instant},
35};
36use tokio_tungstenite::{
37    connect_async,
38    tungstenite::{
39        client::IntoClientRequest,
40        http::header::AUTHORIZATION,
41        protocol::{CloseFrame, Message, frame::coding::CloseCode},
42    },
43};
44use tracing::{debug, info, warn};
45use weft_client_shim::CliContext;
46
47use crate::credentials;
48
49/// Local mirror of `weft_server::presence::hub::PresenceEvent`.
50///
51/// Kept in sync with the server definition so we don't add a
52/// `cli → server` dep (which would pull in axum + sqlx into the
53/// CLI build). The shape is small and stable — if either side evolves, a
54/// compile-time reminder should land via a failing integration test.
55// The `Agent` prefix is load-bearing: variant names mirror
56// `weft_server::presence::hub::PresenceEvent` exactly so the mirror
57// invariant documented above holds. Renaming would drift the two sides
58// and make any future grep-based refactor miss this side.
59#[allow(clippy::enum_variant_names)]
60#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
61#[serde(tag = "kind", rename_all = "snake_case")]
62enum PresenceEvent {
63    #[serde(rename = "agent_start")]
64    Start {
65        session_id: String,
66        subject: String,
67        namespace: String,
68        #[serde(default, skip_serializing_if = "Option::is_none")]
69        model: Option<String>,
70        #[serde(default, skip_serializing_if = "Option::is_none")]
71        provider: Option<String>,
72        #[serde(default, skip_serializing_if = "Option::is_none")]
73        cursor_path: Option<String>,
74        started_at_ms: u64,
75    },
76    #[serde(rename = "agent_heartbeat")]
77    Heartbeat {
78        session_id: String,
79        subject: String,
80        namespace: String,
81        #[serde(default, skip_serializing_if = "Option::is_none")]
82        cursor_path: Option<String>,
83        #[serde(default, skip_serializing_if = "Vec::is_empty")]
84        recent_actions: Vec<String>,
85        ts_ms: u64,
86    },
87    #[serde(rename = "agent_done")]
88    Done {
89        session_id: String,
90        subject: String,
91        namespace: String,
92        ts_ms: u64,
93    },
94}
95
96#[derive(Debug, Serialize)]
97#[serde(tag = "type", rename_all = "snake_case")]
98enum ClientFrame<'a> {
99    Hello { role: &'a str },
100    Publish { event: &'a PresenceEvent },
101}
102
103#[derive(Debug, Deserialize)]
104#[serde(tag = "type", rename_all = "snake_case")]
105enum ServerFrame {
106    Ready {
107        #[allow(dead_code)]
108        subscribed: Vec<String>,
109    },
110    Event {
111        #[allow(dead_code)]
112        ts_ms: u64,
113        #[allow(dead_code)]
114        event: serde_json::Value,
115    },
116    Error {
117        code: String,
118        message: String,
119    },
120}
121
122/// Configuration resolved from the repository + user configs + CLI flags.
123#[derive(Debug, Clone)]
124pub struct PublisherConfig {
125    pub session_id: String,
126    pub subject: String,
127    pub namespace: String,
128    pub model: Option<String>,
129    pub provider: Option<String>,
130    pub token: String,
131    pub ws_url: String,
132    pub interval: Duration,
133}
134
135/// Entry point used by `main.rs`.
136pub async fn cmd_presence_publish(
137    ctx: &dyn CliContext,
138    session: String,
139    interval_secs: u64,
140) -> Result<()> {
141    let repo_root = match ctx.repo_path() {
142        Some(p) => p.to_path_buf(),
143        None => std::env::current_dir()?,
144    };
145    let repo = Repository::open(&repo_root).with_context(|| {
146        format!(
147            "failed to open Heddle repository at {}",
148            repo_root.display()
149        )
150    })?;
151
152    let hosted = repo.config().hosted.clone();
153    let agent = load_agent_entry(repo.heddle_dir(), &session)?;
154
155    let user_config = UserConfig::load_default()?;
156
157    match resolve_publisher_config(
158        &hosted,
159        &agent,
160        &user_config,
161        Duration::from_secs(interval_secs),
162    )? {
163        Some(config) => run_publisher(config).await,
164        None => Err(anyhow!(
165            "hosted presence requires a repository linked to a Heddle hosted upstream. Configure [hosted] in .heddle/config.toml or use a hosted-enabled repository."
166        )),
167    }
168}
169
170fn load_agent_entry(heddle_dir: &Path, session: &str) -> Result<AgentEntry> {
171    let registry = AgentRegistry::new(heddle_dir);
172    registry
173        .load(session)
174        .with_context(|| format!("failed to read agent registry for session '{session}'"))?
175        .ok_or_else(|| {
176            anyhow!(
177                "no agent entry found for session '{session}' at {}",
178                heddle_dir
179                    .join("agents")
180                    .join(format!("{session}.toml"))
181                    .display()
182            )
183        })
184}
185
186/// Resolve everything the publisher loop needs up front.
187///
188/// Returns `Ok(None)` when the repository is local-only (no upstream or
189/// namespace configured). Returns `Err` only for unrecoverable setup problems
190/// (missing token, malformed URL, etc).
191pub fn resolve_publisher_config(
192    hosted: &HostedConfig,
193    agent: &AgentEntry,
194    user_config: &UserConfig,
195    interval: Duration,
196) -> Result<Option<PublisherConfig>> {
197    let (Some(upstream), Some(namespace)) = (
198        hosted.upstream_url.as_deref().filter(|s| !s.is_empty()),
199        hosted.namespace.as_deref().filter(|s| !s.is_empty()),
200    ) else {
201        return Ok(None);
202    };
203
204    let (token, credential_subject) = if let Some(token) = user_config.remote_token()? {
205        (token.id, None)
206    } else {
207        let stored_credential = credentials::resolve_credential_for_server(upstream)
208            .with_context(|| format!("loading stored credential for {upstream}"))?;
209        if let Some(credential) = stored_credential {
210            (credential.token, Some(credential.subject))
211        } else {
212            return Err(anyhow!(
213                "no remote token available — set HEDDLE_REMOTE_TOKEN or run `heddle auth login`"
214            ));
215        }
216    };
217
218    let ws_url = normalize_ws_url(upstream)?;
219
220    // Biscuit tokens are intentionally opaque to the CLI. Use the configured
221    // principal as the subject we publish, and let the server validate it
222    // against the authenticated Biscuit facts.
223    let subject = user_config
224        .principal
225        .as_ref()
226        .map(|p| p.email.clone())
227        .or(credential_subject)
228        .ok_or_else(|| {
229            anyhow!("could not derive subject from principal config — run `heddle auth login`")
230        })?;
231
232    Ok(Some(PublisherConfig {
233        session_id: agent.session_id.clone(),
234        subject,
235        namespace: namespace.to_string(),
236        model: agent.model.clone(),
237        provider: agent.provider.clone(),
238        token,
239        ws_url,
240        interval,
241    }))
242}
243
244/// Normalise an upstream URL into a `ws(s)://…/presence/ws` target.
245///
246/// Accepts any of: `https://host`, `http://host:8421`, `ws://host`,
247/// `wss://host/path` (with or without trailing slash). The path is
248/// replaced with `/presence/ws`.
249fn normalize_ws_url(upstream: &str) -> Result<String> {
250    let trimmed = upstream.trim_end_matches('/');
251    let (scheme_end, ws_scheme) = if let Some(rest) = trimmed.strip_prefix("https://") {
252        (trimmed.len() - rest.len(), "wss://")
253    } else if let Some(rest) = trimmed.strip_prefix("http://") {
254        (trimmed.len() - rest.len(), "ws://")
255    } else if trimmed.starts_with("ws://") || trimmed.starts_with("wss://") {
256        return Ok(strip_path_and_append_presence(trimmed));
257    } else {
258        return Err(anyhow!(
259            "unsupported upstream URL '{upstream}' (expected http(s):// or ws(s)://)"
260        ));
261    };
262    // Strip any path component from `<scheme>host[:port]/…`.
263    let host_start = scheme_end;
264    let after = &trimmed[host_start..];
265    let host = match after.find('/') {
266        Some(idx) => &after[..idx],
267        None => after,
268    };
269    Ok(format!("{ws_scheme}{host}/presence/ws"))
270}
271
272fn strip_path_and_append_presence(url: &str) -> String {
273    let (scheme, rest) = if let Some(r) = url.strip_prefix("wss://") {
274        ("wss://", r)
275    } else if let Some(r) = url.strip_prefix("ws://") {
276        ("ws://", r)
277    } else {
278        return url.to_string();
279    };
280    let host = rest.split('/').next().unwrap_or(rest);
281    format!("{scheme}{host}/presence/ws")
282}
283
284/// Main reconnect loop. Runs until Ctrl-C or an authoritative auth failure.
285pub async fn run_publisher(config: PublisherConfig) -> Result<()> {
286    let config = Arc::new(config);
287    let cancelled = Arc::new(Mutex::new(false));
288    let cancel_signal = {
289        let cancelled = cancelled.clone();
290        async move {
291            if tokio::signal::ctrl_c().await.is_ok() {
292                *cancelled.lock().await = true;
293                info!("presence: Ctrl-C received, shutting down");
294            }
295        }
296    };
297    tokio::spawn(cancel_signal);
298
299    let mut backoff = BackoffPlan::new();
300    loop {
301        if *cancelled.lock().await {
302            return Ok(());
303        }
304        match connect_and_stream(config.clone(), cancelled.clone()).await {
305            Ok(exit) => match exit {
306                LoopExit::Cancelled => return Ok(()),
307                LoopExit::Disconnected => {
308                    let delay = backoff.next();
309                    warn!(
310                        retry_in_ms = delay.as_millis() as u64,
311                        "presence: disconnected, reconnecting"
312                    );
313                    if wait_or_cancel(delay, cancelled.clone()).await {
314                        return Ok(());
315                    }
316                }
317            },
318            Err(ConnectError::Unauthorized) => {
319                // Bail cleanly; long-running agents should mint a
320                // device-bound token via `heddle auth login` (30-day TTL)
321                // to avoid hitting this mid-session.
322                warn!(
323                    "presence: authentication failed (401) — token expired or revoked. \
324                     Re-run `heddle auth login` to continue publishing. \
325                     (Device-bound tokens have a 30-day TTL and are recommended for \
326                     long-running agent sessions.)"
327                );
328                return Err(anyhow!("presence publisher: unauthorized"));
329            }
330            Err(ConnectError::Forbidden(err)) => {
331                warn!(
332                    error = %err,
333                    "presence: server returned 403 forbidden — scope mismatch or namespace \
334                     not provisioned. Not retrying.",
335                );
336                return Err(anyhow!("presence publisher: forbidden — {err}"));
337            }
338            Err(ConnectError::Fatal(err)) => {
339                return Err(err);
340            }
341            Err(ConnectError::Transient(err)) => {
342                let delay = backoff.next();
343                warn!(error = %err, retry_in_ms = delay.as_millis() as u64, "presence: transient error, reconnecting");
344                if wait_or_cancel(delay, cancelled.clone()).await {
345                    return Ok(());
346                }
347            }
348        }
349    }
350}
351
352enum LoopExit {
353    Cancelled,
354    Disconnected,
355}
356
357enum ConnectError {
358    /// HTTP 401 on connect or server reported `unauthorized` mid-stream.
359    /// Structural auth failure — don't retry without a fresh token.
360    Unauthorized,
361    /// HTTP 403 on connect or server reported `forbidden` mid-stream.
362    /// Scope/namespace mismatch — a refresh won't help.
363    Forbidden(anyhow::Error),
364    Transient(anyhow::Error),
365    Fatal(anyhow::Error),
366}
367
368impl From<anyhow::Error> for ConnectError {
369    fn from(err: anyhow::Error) -> Self {
370        ConnectError::Transient(err)
371    }
372}
373
374async fn wait_or_cancel(delay: Duration, cancelled: Arc<Mutex<bool>>) -> bool {
375    let deadline = Instant::now() + delay;
376    loop {
377        if *cancelled.lock().await {
378            return true;
379        }
380        let now = Instant::now();
381        if now >= deadline {
382            return false;
383        }
384        let step = (deadline - now).min(Duration::from_millis(250));
385        tokio::time::sleep(step).await;
386    }
387}
388
389async fn connect_and_stream(
390    config: Arc<PublisherConfig>,
391    cancelled: Arc<Mutex<bool>>,
392) -> Result<LoopExit, ConnectError> {
393    debug!(url = %config.ws_url, "presence: connecting");
394
395    let request = config
396        .ws_url
397        .as_str()
398        .into_client_request()
399        .map_err(|e| ConnectError::Fatal(anyhow!("invalid WS URL: {e}")))?;
400    let mut request = request;
401    request.headers_mut().insert(
402        AUTHORIZATION,
403        format!("Bearer {}", config.token)
404            .parse()
405            .map_err(|e| ConnectError::Fatal(anyhow!("invalid bearer token: {e}")))?,
406    );
407
408    let (ws, _resp) = match connect_async(request).await {
409        Ok(pair) => pair,
410        Err(tokio_tungstenite::tungstenite::Error::Http(resp)) if resp.status() == 401 => {
411            return Err(ConnectError::Unauthorized);
412        }
413        Err(tokio_tungstenite::tungstenite::Error::Http(resp)) if resp.status() == 403 => {
414            return Err(ConnectError::Forbidden(anyhow!(
415                "server returned 403 on WebSocket handshake"
416            )));
417        }
418        Err(err) => return Err(ConnectError::Transient(anyhow!("ws connect: {err}"))),
419    };
420
421    let (mut tx, mut rx) = ws.split();
422
423    // Hello.
424    send_client_frame(&mut tx, &ClientFrame::Hello { role: "cli" })
425        .await
426        .map_err(ConnectError::Transient)?;
427
428    // Drain the initial `ready` (or error) from the server before starting.
429    match tokio::time::timeout(Duration::from_secs(10), rx.next()).await {
430        Ok(Some(Ok(Message::Text(txt)))) => {
431            if let Ok(ServerFrame::Error { code, message }) =
432                serde_json::from_str::<ServerFrame>(txt.as_str())
433            {
434                return Err(ConnectError::Fatal(anyhow!(
435                    "server rejected hello: {code} — {message}"
436                )));
437            }
438        }
439        Ok(Some(Err(err))) => return Err(ConnectError::Transient(anyhow!("ws: {err}"))),
440        Ok(None) => return Ok(LoopExit::Disconnected),
441        Err(_) => return Err(ConnectError::Transient(anyhow!("hello timeout"))),
442        _ => {}
443    }
444
445    // Publish agent_start.
446    let start_event = PresenceEvent::Start {
447        session_id: config.session_id.clone(),
448        subject: config.subject.clone(),
449        namespace: config.namespace.clone(),
450        model: config.model.clone(),
451        provider: config.provider.clone(),
452        cursor_path: None,
453        started_at_ms: now_millis(),
454    };
455    send_client_frame(
456        &mut tx,
457        &ClientFrame::Publish {
458            event: &start_event,
459        },
460    )
461    .await
462    .map_err(ConnectError::Transient)?;
463    info!(
464        session = %config.session_id,
465        namespace = %config.namespace,
466        "presence: published agent_start"
467    );
468
469    let mut ticker = time::interval(config.interval);
470    ticker.tick().await; // skip immediate first tick
471
472    loop {
473        // Check cancellation before each iteration — lets the Ctrl-C handler
474        // interrupt even during an in-flight interval wait.
475        if *cancelled.lock().await {
476            let done_event = PresenceEvent::Done {
477                session_id: config.session_id.clone(),
478                subject: config.subject.clone(),
479                namespace: config.namespace.clone(),
480                ts_ms: now_millis(),
481            };
482            let _ = send_client_frame(&mut tx, &ClientFrame::Publish { event: &done_event }).await;
483            let _ = tx
484                .send(Message::Close(Some(CloseFrame {
485                    code: CloseCode::Normal,
486                    reason: "agent_done".into(),
487                })))
488                .await;
489            return Ok(LoopExit::Cancelled);
490        }
491
492        select! {
493            _ = ticker.tick() => {
494                let heartbeat = PresenceEvent::Heartbeat {
495                    session_id: config.session_id.clone(),
496                    subject: config.subject.clone(),
497                    namespace: config.namespace.clone(),
498                    cursor_path: None,
499                    recent_actions: Vec::new(),
500                    ts_ms: now_millis(),
501                };
502                if let Err(err) = send_client_frame(&mut tx, &ClientFrame::Publish { event: &heartbeat }).await {
503                    warn!(error = %err, "presence: heartbeat send failed; reconnecting");
504                    return Ok(LoopExit::Disconnected);
505                }
506            }
507            msg = rx.next() => {
508                match msg {
509                    Some(Ok(Message::Text(txt))) => {
510                        if let Ok(ServerFrame::Error { code, message }) = serde_json::from_str::<ServerFrame>(txt.as_str()) {
511                            warn!(code = %code, message = %message, "presence: server reported error");
512                            match code.as_str() {
513                                // Structural auth failure mid-stream — treat
514                                // identically to a 401 on connect so the
515                                // outer loop stops and the user gets the
516                                // "re-run `heddle auth login`" hint.
517                                "unauthorized" => return Err(ConnectError::Unauthorized),
518                                "forbidden" => {
519                                    return Err(ConnectError::Forbidden(anyhow!(
520                                        "server forbade publish: {message}"
521                                    )));
522                                }
523                                _ => return Ok(LoopExit::Disconnected),
524                            }
525                        }
526                        // Event/Ready frames are benign for a publisher — ignore.
527                    },
528                    Some(Ok(Message::Ping(p))) => match tx.send(Message::Pong(p)).await {
529                        Ok(()) => {}
530                        Err(_) => return Ok(LoopExit::Disconnected),
531                    },
532                    Some(Ok(Message::Close(_))) | None => return Ok(LoopExit::Disconnected),
533                    Some(Err(err)) => {
534                        warn!(error = %err, "presence: ws recv error");
535                        return Ok(LoopExit::Disconnected);
536                    }
537                    _ => {}
538                }
539            }
540            // Wake periodically so Ctrl-C doesn't have to wait the full
541            // interval before we check `cancelled` at the top of the loop.
542            () = tokio::time::sleep(Duration::from_millis(250)) => {}
543        }
544    }
545}
546
547async fn send_client_frame<S>(tx: &mut S, frame: &ClientFrame<'_>) -> Result<()>
548where
549    S: SinkExt<Message> + Unpin,
550    <S as futures::Sink<Message>>::Error: std::fmt::Display,
551{
552    let payload = serde_json::to_string(frame).context("serialise client frame")?;
553    tx.send(Message::Text(payload.into()))
554        .await
555        .map_err(|e| anyhow!("ws send: {e}"))?;
556    Ok(())
557}
558
559fn now_millis() -> u64 {
560    std::time::SystemTime::now()
561        .duration_since(std::time::UNIX_EPOCH)
562        .map(|d| d.as_millis() as u64)
563        .unwrap_or(0)
564}
565
566struct BackoffPlan {
567    next: Duration,
568}
569
570impl BackoffPlan {
571    fn new() -> Self {
572        Self {
573            next: Duration::from_secs(1),
574        }
575    }
576
577    fn next(&mut self) -> Duration {
578        let out = self.next;
579        self.next = (self.next * 2).min(Duration::from_secs(30));
580        out
581    }
582}
583
584#[cfg(test)]
585mod tests {
586    use chrono::Utc;
587    use objects::store::AgentStatus;
588
589    use super::*;
590
591    fn make_agent(session_id: &str) -> AgentEntry {
592        AgentEntry {
593            session_id: session_id.into(),
594            client_instance_id: None,
595            native_actor_key: None,
596            native_parent_actor_key: None,
597            native_instance_key: None,
598            heddle_session_id: None,
599            thread_id: None,
600            thread: format!("agent/{session_id}"),
601            pid: None,
602            boot_id: None,
603            liveness_path: None,
604            heartbeat_at: None,
605            anchor_state: None,
606            anchor_root: None,
607            reservation_token: None,
608            path: None,
609            base_state: "base".into(),
610            started_at: Utc::now(),
611            provider: Some("anthropic".into()),
612            model: Some("claude-sonnet-4-6".into()),
613            harness: None,
614            thinking_level: None,
615            usage_summary: Default::default(),
616            last_progress_at: None,
617            report_flush_state: None,
618            attach_reason: None,
619            attach_precedence: vec![],
620            winning_attach_rule: None,
621            probe_source: None,
622            probe_confidence: None,
623            status: AgentStatus::Active,
624            completed_at: None,
625            context_queries: vec![],
626        }
627    }
628
629    use cli_shared::config::UserPrincipalConfig;
630
631    fn user_with_token_and_principal() -> UserConfig {
632        let mut user = UserConfig::default();
633        user.remote.token = Some("opaque-token".into());
634        user.principal = Some(UserPrincipalConfig {
635            name: "Alice".into(),
636            email: "alice@example.com".into(),
637        });
638        user
639    }
640
641    #[test]
642    fn skips_when_upstream_missing() {
643        let hosted = HostedConfig {
644            upstream_url: None,
645            namespace: Some("heddle/core".into()),
646        };
647        let result = resolve_publisher_config(
648            &hosted,
649            &make_agent("agent-1"),
650            &user_with_token_and_principal(),
651            Duration::from_secs(15),
652        )
653        .unwrap();
654        assert!(result.is_none());
655    }
656
657    #[test]
658    fn skips_when_namespace_missing() {
659        let hosted = HostedConfig {
660            upstream_url: Some("https://heddle.example.com".into()),
661            namespace: None,
662        };
663        let result = resolve_publisher_config(
664            &hosted,
665            &make_agent("agent-1"),
666            &user_with_token_and_principal(),
667            Duration::from_secs(15),
668        )
669        .unwrap();
670        assert!(result.is_none());
671    }
672
673    #[test]
674    fn resolves_subject_from_principal_when_token_is_opaque() {
675        let hosted = HostedConfig {
676            upstream_url: Some("https://heddle.example.com".into()),
677            namespace: Some("heddle/core".into()),
678        };
679        let config = resolve_publisher_config(
680            &hosted,
681            &make_agent("agent-1"),
682            &user_with_token_and_principal(),
683            Duration::from_secs(15),
684        )
685        .unwrap()
686        .expect("config should resolve");
687        assert_eq!(config.subject, "alice@example.com");
688        assert_eq!(config.namespace, "heddle/core");
689        assert_eq!(config.ws_url, "wss://heddle.example.com/presence/ws");
690    }
691
692    #[test]
693    fn env_token_skips_malformed_credential_store() {
694        let _guard = crate::credentials::lock_test_env();
695        let temp = tempfile::TempDir::new().unwrap();
696        let original_home = std::env::var_os("HOME");
697        let original_token = std::env::var_os("HEDDLE_REMOTE_TOKEN");
698        unsafe {
699            std::env::set_var("HOME", temp.path());
700            std::env::set_var("HEDDLE_REMOTE_TOKEN", "env-token");
701        }
702        std::fs::create_dir_all(temp.path().join(".heddle")).unwrap();
703        std::fs::write(
704            temp.path().join(".heddle/credentials.toml"),
705            "this is not valid toml =",
706        )
707        .unwrap();
708
709        let hosted = HostedConfig {
710            upstream_url: Some("https://heddle.example.com".into()),
711            namespace: Some("heddle/core".into()),
712        };
713        let config = resolve_publisher_config(
714            &hosted,
715            &make_agent("agent-1"),
716            &UserConfig {
717                remote: Default::default(),
718                principal: user_with_token_and_principal().principal,
719                ..Default::default()
720            },
721            Duration::from_secs(15),
722        )
723        .unwrap()
724        .expect("config should resolve from env token");
725
726        unsafe {
727            if let Some(home) = original_home {
728                std::env::set_var("HOME", home);
729            } else {
730                std::env::remove_var("HOME");
731            }
732            if let Some(token) = original_token {
733                std::env::set_var("HEDDLE_REMOTE_TOKEN", token);
734            } else {
735                std::env::remove_var("HEDDLE_REMOTE_TOKEN");
736            }
737        }
738        assert_eq!(config.token, "env-token");
739    }
740
741    #[test]
742    fn normalises_https_upstream_to_wss() {
743        assert_eq!(
744            normalize_ws_url("https://heddle.example.com").unwrap(),
745            "wss://heddle.example.com/presence/ws"
746        );
747        assert_eq!(
748            normalize_ws_url("https://heddle.example.com/").unwrap(),
749            "wss://heddle.example.com/presence/ws"
750        );
751        assert_eq!(
752            normalize_ws_url("http://127.0.0.1:8421").unwrap(),
753            "ws://127.0.0.1:8421/presence/ws"
754        );
755        assert_eq!(
756            normalize_ws_url("ws://localhost:8421/any/path").unwrap(),
757            "ws://localhost:8421/presence/ws"
758        );
759    }
760
761    #[test]
762    fn errors_on_missing_token() {
763        // Isolate the env so an ambient `HEDDLE_REMOTE_TOKEN` (common in
764        // dev shells that source a `.env` for the hosted services) can't
765        // satisfy `user_config.remote_token()` and flip the error path
766        // to "could not derive subject from principal config" instead of
767        // the "no remote token available" message this test pins. The
768        // sibling tests use the same lock + save/restore dance.
769        let _guard = crate::credentials::lock_test_env();
770        let temp = tempfile::TempDir::new().unwrap();
771        let original_home = std::env::var_os("HOME");
772        let original_token = std::env::var_os("HEDDLE_REMOTE_TOKEN");
773        unsafe {
774            std::env::set_var("HOME", temp.path());
775            std::env::remove_var("HEDDLE_REMOTE_TOKEN");
776        }
777
778        let hosted = HostedConfig {
779            upstream_url: Some("https://heddle.example.com".into()),
780            namespace: Some("heddle/core".into()),
781        };
782        let user = UserConfig::default();
783        let err = resolve_publisher_config(
784            &hosted,
785            &make_agent("agent-1"),
786            &user,
787            Duration::from_secs(15),
788        )
789        .unwrap_err();
790        let msg = format!("{err}");
791
792        unsafe {
793            if let Some(home) = original_home {
794                std::env::set_var("HOME", home);
795            } else {
796                std::env::remove_var("HOME");
797            }
798            if let Some(token) = original_token {
799                std::env::set_var("HEDDLE_REMOTE_TOKEN", token);
800            } else {
801                std::env::remove_var("HEDDLE_REMOTE_TOKEN");
802            }
803        }
804
805        assert!(msg.contains("remote token"), "unexpected err: {msg}");
806    }
807}