1use std::{path::Path, sync::Arc, time::Duration};
24
25use anyhow::{Context, Result, anyhow};
26use cli_shared::UserConfig;
27use futures::{SinkExt, StreamExt};
28use objects::store::{AgentEntry, AgentRegistry};
29use repo::{HostedConfig, Repository};
30use serde::{Deserialize, Serialize};
31use tokio::{
32 select,
33 sync::Mutex,
34 time::{self, Instant},
35};
36use tokio_tungstenite::{
37 connect_async,
38 tungstenite::{
39 client::IntoClientRequest,
40 http::header::AUTHORIZATION,
41 protocol::{CloseFrame, Message, frame::coding::CloseCode},
42 },
43};
44use tracing::{debug, info, warn};
45use weft_client_shim::CliContext;
46
47use crate::credentials;
48
49#[allow(clippy::enum_variant_names)]
60#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
61#[serde(tag = "kind", rename_all = "snake_case")]
62enum PresenceEvent {
63 #[serde(rename = "agent_start")]
64 Start {
65 session_id: String,
66 subject: String,
67 namespace: String,
68 #[serde(default, skip_serializing_if = "Option::is_none")]
69 model: Option<String>,
70 #[serde(default, skip_serializing_if = "Option::is_none")]
71 provider: Option<String>,
72 #[serde(default, skip_serializing_if = "Option::is_none")]
73 cursor_path: Option<String>,
74 started_at_ms: u64,
75 },
76 #[serde(rename = "agent_heartbeat")]
77 Heartbeat {
78 session_id: String,
79 subject: String,
80 namespace: String,
81 #[serde(default, skip_serializing_if = "Option::is_none")]
82 cursor_path: Option<String>,
83 #[serde(default, skip_serializing_if = "Vec::is_empty")]
84 recent_actions: Vec<String>,
85 ts_ms: u64,
86 },
87 #[serde(rename = "agent_done")]
88 Done {
89 session_id: String,
90 subject: String,
91 namespace: String,
92 ts_ms: u64,
93 },
94}
95
96#[derive(Debug, Serialize)]
97#[serde(tag = "type", rename_all = "snake_case")]
98enum ClientFrame<'a> {
99 Hello { role: &'a str },
100 Publish { event: &'a PresenceEvent },
101}
102
103#[derive(Debug, Deserialize)]
104#[serde(tag = "type", rename_all = "snake_case")]
105enum ServerFrame {
106 Ready {
107 #[allow(dead_code)]
108 subscribed: Vec<String>,
109 },
110 Event {
111 #[allow(dead_code)]
112 ts_ms: u64,
113 #[allow(dead_code)]
114 event: serde_json::Value,
115 },
116 Error {
117 code: String,
118 message: String,
119 },
120}
121
122#[derive(Debug, Clone)]
124pub struct PublisherConfig {
125 pub session_id: String,
126 pub subject: String,
127 pub namespace: String,
128 pub model: Option<String>,
129 pub provider: Option<String>,
130 pub token: String,
131 pub ws_url: String,
132 pub interval: Duration,
133}
134
135pub async fn cmd_presence_publish(
137 ctx: &dyn CliContext,
138 session: String,
139 interval_secs: u64,
140) -> Result<()> {
141 let repo_root = match ctx.repo_path() {
142 Some(p) => p.to_path_buf(),
143 None => std::env::current_dir()?,
144 };
145 let repo = Repository::open(&repo_root).with_context(|| {
146 format!(
147 "failed to open Heddle repository at {}",
148 repo_root.display()
149 )
150 })?;
151
152 let hosted = repo.config().hosted.clone();
153 let agent = load_agent_entry(repo.heddle_dir(), &session)?;
154
155 let user_config = UserConfig::load_default()?;
156
157 match resolve_publisher_config(
158 &hosted,
159 &agent,
160 &user_config,
161 Duration::from_secs(interval_secs),
162 )? {
163 Some(config) => run_publisher(config).await,
164 None => Err(anyhow!(
165 "hosted presence requires a repository linked to a Heddle hosted upstream. Configure [hosted] in .heddle/config.toml or use a hosted-enabled repository."
166 )),
167 }
168}
169
170fn load_agent_entry(heddle_dir: &Path, session: &str) -> Result<AgentEntry> {
171 let registry = AgentRegistry::new(heddle_dir);
172 registry
173 .load(session)
174 .with_context(|| format!("failed to read agent registry for session '{session}'"))?
175 .ok_or_else(|| {
176 anyhow!(
177 "no agent entry found for session '{session}' at {}",
178 heddle_dir
179 .join("agents")
180 .join(format!("{session}.toml"))
181 .display()
182 )
183 })
184}
185
186pub fn resolve_publisher_config(
192 hosted: &HostedConfig,
193 agent: &AgentEntry,
194 user_config: &UserConfig,
195 interval: Duration,
196) -> Result<Option<PublisherConfig>> {
197 let (Some(upstream), Some(namespace)) = (
198 hosted.upstream_url.as_deref().filter(|s| !s.is_empty()),
199 hosted.namespace.as_deref().filter(|s| !s.is_empty()),
200 ) else {
201 return Ok(None);
202 };
203
204 let (token, credential_subject) = if let Some(token) = user_config.remote_token()? {
205 (token.id, None)
206 } else {
207 let stored_credential = credentials::resolve_credential_for_server(upstream)
208 .with_context(|| format!("loading stored credential for {upstream}"))?;
209 if let Some(credential) = stored_credential {
210 (credential.token, Some(credential.subject))
211 } else {
212 return Err(anyhow!(
213 "no remote token available — set HEDDLE_REMOTE_TOKEN or run `heddle auth login`"
214 ));
215 }
216 };
217
218 let ws_url = normalize_ws_url(upstream)?;
219
220 let subject = user_config
224 .principal
225 .as_ref()
226 .map(|p| p.email.clone())
227 .or(credential_subject)
228 .ok_or_else(|| {
229 anyhow!("could not derive subject from principal config — run `heddle auth login`")
230 })?;
231
232 Ok(Some(PublisherConfig {
233 session_id: agent.session_id.clone(),
234 subject,
235 namespace: namespace.to_string(),
236 model: agent.model.clone(),
237 provider: agent.provider.clone(),
238 token,
239 ws_url,
240 interval,
241 }))
242}
243
244fn normalize_ws_url(upstream: &str) -> Result<String> {
250 let trimmed = upstream.trim_end_matches('/');
251 let (scheme_end, ws_scheme) = if let Some(rest) = trimmed.strip_prefix("https://") {
252 (trimmed.len() - rest.len(), "wss://")
253 } else if let Some(rest) = trimmed.strip_prefix("http://") {
254 (trimmed.len() - rest.len(), "ws://")
255 } else if trimmed.starts_with("ws://") || trimmed.starts_with("wss://") {
256 return Ok(strip_path_and_append_presence(trimmed));
257 } else {
258 return Err(anyhow!(
259 "unsupported upstream URL '{upstream}' (expected http(s):// or ws(s)://)"
260 ));
261 };
262 let host_start = scheme_end;
264 let after = &trimmed[host_start..];
265 let host = match after.find('/') {
266 Some(idx) => &after[..idx],
267 None => after,
268 };
269 Ok(format!("{ws_scheme}{host}/presence/ws"))
270}
271
272fn strip_path_and_append_presence(url: &str) -> String {
273 let (scheme, rest) = if let Some(r) = url.strip_prefix("wss://") {
274 ("wss://", r)
275 } else if let Some(r) = url.strip_prefix("ws://") {
276 ("ws://", r)
277 } else {
278 return url.to_string();
279 };
280 let host = rest.split('/').next().unwrap_or(rest);
281 format!("{scheme}{host}/presence/ws")
282}
283
284pub async fn run_publisher(config: PublisherConfig) -> Result<()> {
286 let config = Arc::new(config);
287 let cancelled = Arc::new(Mutex::new(false));
288 let cancel_signal = {
289 let cancelled = cancelled.clone();
290 async move {
291 if tokio::signal::ctrl_c().await.is_ok() {
292 *cancelled.lock().await = true;
293 info!("presence: Ctrl-C received, shutting down");
294 }
295 }
296 };
297 tokio::spawn(cancel_signal);
298
299 let mut backoff = BackoffPlan::new();
300 loop {
301 if *cancelled.lock().await {
302 return Ok(());
303 }
304 match connect_and_stream(config.clone(), cancelled.clone()).await {
305 Ok(exit) => match exit {
306 LoopExit::Cancelled => return Ok(()),
307 LoopExit::Disconnected => {
308 let delay = backoff.next();
309 warn!(
310 retry_in_ms = delay.as_millis() as u64,
311 "presence: disconnected, reconnecting"
312 );
313 if wait_or_cancel(delay, cancelled.clone()).await {
314 return Ok(());
315 }
316 }
317 },
318 Err(ConnectError::Unauthorized) => {
319 warn!(
323 "presence: authentication failed (401) — token expired or revoked. \
324 Re-run `heddle auth login` to continue publishing. \
325 (Device-bound tokens have a 30-day TTL and are recommended for \
326 long-running agent sessions.)"
327 );
328 return Err(anyhow!("presence publisher: unauthorized"));
329 }
330 Err(ConnectError::Forbidden(err)) => {
331 warn!(
332 error = %err,
333 "presence: server returned 403 forbidden — scope mismatch or namespace \
334 not provisioned. Not retrying.",
335 );
336 return Err(anyhow!("presence publisher: forbidden — {err}"));
337 }
338 Err(ConnectError::Fatal(err)) => {
339 return Err(err);
340 }
341 Err(ConnectError::Transient(err)) => {
342 let delay = backoff.next();
343 warn!(error = %err, retry_in_ms = delay.as_millis() as u64, "presence: transient error, reconnecting");
344 if wait_or_cancel(delay, cancelled.clone()).await {
345 return Ok(());
346 }
347 }
348 }
349 }
350}
351
352enum LoopExit {
353 Cancelled,
354 Disconnected,
355}
356
357enum ConnectError {
358 Unauthorized,
361 Forbidden(anyhow::Error),
364 Transient(anyhow::Error),
365 Fatal(anyhow::Error),
366}
367
368impl From<anyhow::Error> for ConnectError {
369 fn from(err: anyhow::Error) -> Self {
370 ConnectError::Transient(err)
371 }
372}
373
374async fn wait_or_cancel(delay: Duration, cancelled: Arc<Mutex<bool>>) -> bool {
375 let deadline = Instant::now() + delay;
376 loop {
377 if *cancelled.lock().await {
378 return true;
379 }
380 let now = Instant::now();
381 if now >= deadline {
382 return false;
383 }
384 let step = (deadline - now).min(Duration::from_millis(250));
385 tokio::time::sleep(step).await;
386 }
387}
388
389async fn connect_and_stream(
390 config: Arc<PublisherConfig>,
391 cancelled: Arc<Mutex<bool>>,
392) -> Result<LoopExit, ConnectError> {
393 debug!(url = %config.ws_url, "presence: connecting");
394
395 let request = config
396 .ws_url
397 .as_str()
398 .into_client_request()
399 .map_err(|e| ConnectError::Fatal(anyhow!("invalid WS URL: {e}")))?;
400 let mut request = request;
401 request.headers_mut().insert(
402 AUTHORIZATION,
403 format!("Bearer {}", config.token)
404 .parse()
405 .map_err(|e| ConnectError::Fatal(anyhow!("invalid bearer token: {e}")))?,
406 );
407
408 let (ws, _resp) = match connect_async(request).await {
409 Ok(pair) => pair,
410 Err(tokio_tungstenite::tungstenite::Error::Http(resp)) if resp.status() == 401 => {
411 return Err(ConnectError::Unauthorized);
412 }
413 Err(tokio_tungstenite::tungstenite::Error::Http(resp)) if resp.status() == 403 => {
414 return Err(ConnectError::Forbidden(anyhow!(
415 "server returned 403 on WebSocket handshake"
416 )));
417 }
418 Err(err) => return Err(ConnectError::Transient(anyhow!("ws connect: {err}"))),
419 };
420
421 let (mut tx, mut rx) = ws.split();
422
423 send_client_frame(&mut tx, &ClientFrame::Hello { role: "cli" })
425 .await
426 .map_err(ConnectError::Transient)?;
427
428 match tokio::time::timeout(Duration::from_secs(10), rx.next()).await {
430 Ok(Some(Ok(Message::Text(txt)))) => {
431 if let Ok(ServerFrame::Error { code, message }) =
432 serde_json::from_str::<ServerFrame>(txt.as_str())
433 {
434 return Err(ConnectError::Fatal(anyhow!(
435 "server rejected hello: {code} — {message}"
436 )));
437 }
438 }
439 Ok(Some(Err(err))) => return Err(ConnectError::Transient(anyhow!("ws: {err}"))),
440 Ok(None) => return Ok(LoopExit::Disconnected),
441 Err(_) => return Err(ConnectError::Transient(anyhow!("hello timeout"))),
442 _ => {}
443 }
444
445 let start_event = PresenceEvent::Start {
447 session_id: config.session_id.clone(),
448 subject: config.subject.clone(),
449 namespace: config.namespace.clone(),
450 model: config.model.clone(),
451 provider: config.provider.clone(),
452 cursor_path: None,
453 started_at_ms: now_millis(),
454 };
455 send_client_frame(
456 &mut tx,
457 &ClientFrame::Publish {
458 event: &start_event,
459 },
460 )
461 .await
462 .map_err(ConnectError::Transient)?;
463 info!(
464 session = %config.session_id,
465 namespace = %config.namespace,
466 "presence: published agent_start"
467 );
468
469 let mut ticker = time::interval(config.interval);
470 ticker.tick().await; loop {
473 if *cancelled.lock().await {
476 let done_event = PresenceEvent::Done {
477 session_id: config.session_id.clone(),
478 subject: config.subject.clone(),
479 namespace: config.namespace.clone(),
480 ts_ms: now_millis(),
481 };
482 let _ = send_client_frame(&mut tx, &ClientFrame::Publish { event: &done_event }).await;
483 let _ = tx
484 .send(Message::Close(Some(CloseFrame {
485 code: CloseCode::Normal,
486 reason: "agent_done".into(),
487 })))
488 .await;
489 return Ok(LoopExit::Cancelled);
490 }
491
492 select! {
493 _ = ticker.tick() => {
494 let heartbeat = PresenceEvent::Heartbeat {
495 session_id: config.session_id.clone(),
496 subject: config.subject.clone(),
497 namespace: config.namespace.clone(),
498 cursor_path: None,
499 recent_actions: Vec::new(),
500 ts_ms: now_millis(),
501 };
502 if let Err(err) = send_client_frame(&mut tx, &ClientFrame::Publish { event: &heartbeat }).await {
503 warn!(error = %err, "presence: heartbeat send failed; reconnecting");
504 return Ok(LoopExit::Disconnected);
505 }
506 }
507 msg = rx.next() => {
508 match msg {
509 Some(Ok(Message::Text(txt))) => {
510 if let Ok(ServerFrame::Error { code, message }) = serde_json::from_str::<ServerFrame>(txt.as_str()) {
511 warn!(code = %code, message = %message, "presence: server reported error");
512 match code.as_str() {
513 "unauthorized" => return Err(ConnectError::Unauthorized),
518 "forbidden" => {
519 return Err(ConnectError::Forbidden(anyhow!(
520 "server forbade publish: {message}"
521 )));
522 }
523 _ => return Ok(LoopExit::Disconnected),
524 }
525 }
526 },
528 Some(Ok(Message::Ping(p))) => match tx.send(Message::Pong(p)).await {
529 Ok(()) => {}
530 Err(_) => return Ok(LoopExit::Disconnected),
531 },
532 Some(Ok(Message::Close(_))) | None => return Ok(LoopExit::Disconnected),
533 Some(Err(err)) => {
534 warn!(error = %err, "presence: ws recv error");
535 return Ok(LoopExit::Disconnected);
536 }
537 _ => {}
538 }
539 }
540 () = tokio::time::sleep(Duration::from_millis(250)) => {}
543 }
544 }
545}
546
547async fn send_client_frame<S>(tx: &mut S, frame: &ClientFrame<'_>) -> Result<()>
548where
549 S: SinkExt<Message> + Unpin,
550 <S as futures::Sink<Message>>::Error: std::fmt::Display,
551{
552 let payload = serde_json::to_string(frame).context("serialise client frame")?;
553 tx.send(Message::Text(payload.into()))
554 .await
555 .map_err(|e| anyhow!("ws send: {e}"))?;
556 Ok(())
557}
558
559fn now_millis() -> u64 {
560 std::time::SystemTime::now()
561 .duration_since(std::time::UNIX_EPOCH)
562 .map(|d| d.as_millis() as u64)
563 .unwrap_or(0)
564}
565
566struct BackoffPlan {
567 next: Duration,
568}
569
570impl BackoffPlan {
571 fn new() -> Self {
572 Self {
573 next: Duration::from_secs(1),
574 }
575 }
576
577 fn next(&mut self) -> Duration {
578 let out = self.next;
579 self.next = (self.next * 2).min(Duration::from_secs(30));
580 out
581 }
582}
583
584#[cfg(test)]
585mod tests {
586 use chrono::Utc;
587 use objects::store::AgentStatus;
588
589 use super::*;
590
591 fn make_agent(session_id: &str) -> AgentEntry {
592 AgentEntry {
593 session_id: session_id.into(),
594 client_instance_id: None,
595 native_actor_key: None,
596 native_parent_actor_key: None,
597 native_instance_key: None,
598 heddle_session_id: None,
599 thread_id: None,
600 thread: format!("agent/{session_id}"),
601 pid: None,
602 boot_id: None,
603 liveness_path: None,
604 heartbeat_at: None,
605 anchor_state: None,
606 anchor_root: None,
607 reservation_token: None,
608 path: None,
609 base_state: "base".into(),
610 started_at: Utc::now(),
611 provider: Some("anthropic".into()),
612 model: Some("claude-sonnet-4-6".into()),
613 harness: None,
614 thinking_level: None,
615 usage_summary: Default::default(),
616 last_progress_at: None,
617 report_flush_state: None,
618 attach_reason: None,
619 attach_precedence: vec![],
620 winning_attach_rule: None,
621 probe_source: None,
622 probe_confidence: None,
623 status: AgentStatus::Active,
624 completed_at: None,
625 context_queries: vec![],
626 }
627 }
628
629 use cli_shared::config::UserPrincipalConfig;
630
631 fn user_with_token_and_principal() -> UserConfig {
632 let mut user = UserConfig::default();
633 user.remote.token = Some("opaque-token".into());
634 user.principal = Some(UserPrincipalConfig {
635 name: "Alice".into(),
636 email: "alice@example.com".into(),
637 });
638 user
639 }
640
641 #[test]
642 fn skips_when_upstream_missing() {
643 let hosted = HostedConfig {
644 upstream_url: None,
645 namespace: Some("heddle/core".into()),
646 };
647 let result = resolve_publisher_config(
648 &hosted,
649 &make_agent("agent-1"),
650 &user_with_token_and_principal(),
651 Duration::from_secs(15),
652 )
653 .unwrap();
654 assert!(result.is_none());
655 }
656
657 #[test]
658 fn skips_when_namespace_missing() {
659 let hosted = HostedConfig {
660 upstream_url: Some("https://heddle.example.com".into()),
661 namespace: None,
662 };
663 let result = resolve_publisher_config(
664 &hosted,
665 &make_agent("agent-1"),
666 &user_with_token_and_principal(),
667 Duration::from_secs(15),
668 )
669 .unwrap();
670 assert!(result.is_none());
671 }
672
673 #[test]
674 fn resolves_subject_from_principal_when_token_is_opaque() {
675 let hosted = HostedConfig {
676 upstream_url: Some("https://heddle.example.com".into()),
677 namespace: Some("heddle/core".into()),
678 };
679 let config = resolve_publisher_config(
680 &hosted,
681 &make_agent("agent-1"),
682 &user_with_token_and_principal(),
683 Duration::from_secs(15),
684 )
685 .unwrap()
686 .expect("config should resolve");
687 assert_eq!(config.subject, "alice@example.com");
688 assert_eq!(config.namespace, "heddle/core");
689 assert_eq!(config.ws_url, "wss://heddle.example.com/presence/ws");
690 }
691
692 #[test]
693 fn env_token_skips_malformed_credential_store() {
694 let _guard = crate::credentials::lock_test_env();
695 let temp = tempfile::TempDir::new().unwrap();
696 let original_home = std::env::var_os("HOME");
697 let original_token = std::env::var_os("HEDDLE_REMOTE_TOKEN");
698 unsafe {
699 std::env::set_var("HOME", temp.path());
700 std::env::set_var("HEDDLE_REMOTE_TOKEN", "env-token");
701 }
702 std::fs::create_dir_all(temp.path().join(".heddle")).unwrap();
703 std::fs::write(
704 temp.path().join(".heddle/credentials.toml"),
705 "this is not valid toml =",
706 )
707 .unwrap();
708
709 let hosted = HostedConfig {
710 upstream_url: Some("https://heddle.example.com".into()),
711 namespace: Some("heddle/core".into()),
712 };
713 let config = resolve_publisher_config(
714 &hosted,
715 &make_agent("agent-1"),
716 &UserConfig {
717 remote: Default::default(),
718 principal: user_with_token_and_principal().principal,
719 ..Default::default()
720 },
721 Duration::from_secs(15),
722 )
723 .unwrap()
724 .expect("config should resolve from env token");
725
726 unsafe {
727 if let Some(home) = original_home {
728 std::env::set_var("HOME", home);
729 } else {
730 std::env::remove_var("HOME");
731 }
732 if let Some(token) = original_token {
733 std::env::set_var("HEDDLE_REMOTE_TOKEN", token);
734 } else {
735 std::env::remove_var("HEDDLE_REMOTE_TOKEN");
736 }
737 }
738 assert_eq!(config.token, "env-token");
739 }
740
741 #[test]
742 fn normalises_https_upstream_to_wss() {
743 assert_eq!(
744 normalize_ws_url("https://heddle.example.com").unwrap(),
745 "wss://heddle.example.com/presence/ws"
746 );
747 assert_eq!(
748 normalize_ws_url("https://heddle.example.com/").unwrap(),
749 "wss://heddle.example.com/presence/ws"
750 );
751 assert_eq!(
752 normalize_ws_url("http://127.0.0.1:8421").unwrap(),
753 "ws://127.0.0.1:8421/presence/ws"
754 );
755 assert_eq!(
756 normalize_ws_url("ws://localhost:8421/any/path").unwrap(),
757 "ws://localhost:8421/presence/ws"
758 );
759 }
760
761 #[test]
762 fn errors_on_missing_token() {
763 let _guard = crate::credentials::lock_test_env();
770 let temp = tempfile::TempDir::new().unwrap();
771 let original_home = std::env::var_os("HOME");
772 let original_token = std::env::var_os("HEDDLE_REMOTE_TOKEN");
773 unsafe {
774 std::env::set_var("HOME", temp.path());
775 std::env::remove_var("HEDDLE_REMOTE_TOKEN");
776 }
777
778 let hosted = HostedConfig {
779 upstream_url: Some("https://heddle.example.com".into()),
780 namespace: Some("heddle/core".into()),
781 };
782 let user = UserConfig::default();
783 let err = resolve_publisher_config(
784 &hosted,
785 &make_agent("agent-1"),
786 &user,
787 Duration::from_secs(15),
788 )
789 .unwrap_err();
790 let msg = format!("{err}");
791
792 unsafe {
793 if let Some(home) = original_home {
794 std::env::set_var("HOME", home);
795 } else {
796 std::env::remove_var("HOME");
797 }
798 if let Some(token) = original_token {
799 std::env::set_var("HEDDLE_REMOTE_TOKEN", token);
800 } else {
801 std::env::remove_var("HEDDLE_REMOTE_TOKEN");
802 }
803 }
804
805 assert!(msg.contains("remote token"), "unexpected err: {msg}");
806 }
807}