1use std::fs;
2use std::path::{Path, PathBuf};
3use std::time::Duration;
4
5use base64::Engine;
6use base64::engine::general_purpose::URL_SAFE_NO_PAD;
7use getrandom::fill as getrandom_fill;
8use serde::{Deserialize, Serialize};
9
10use crate::constants::{AGENTS_DIR, AIT_FILE_NAME, SECRET_KEY_FILE_NAME};
11use crate::db::SqliteStore;
12use crate::did::parse_agent_did;
13use crate::error::{CoreError, Result};
14use crate::http::blocking_client;
15use crate::identity::decode_secret_key;
16use crate::peers::{
17 PersistPeerInput, load_peers_config, persist_peer, sync_openclaw_relay_peers_snapshot,
18};
19use crate::qr::decode_ticket_from_png;
20use crate::signing::{SignHttpRequestInput, sign_http_request};
21
22pub const PAIR_START_PATH: &str = "/pair/start";
23pub const PAIR_CONFIRM_PATH: &str = "/pair/confirm";
24pub const PAIR_STATUS_PATH: &str = "/pair/status";
25pub const PAIRING_TICKET_PREFIX: &str = "clwpair1_";
26
27pub const DEFAULT_STATUS_WAIT_SECONDS: u64 = 300;
28pub const DEFAULT_STATUS_POLL_INTERVAL_SECONDS: u64 = 3;
29
30#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
31#[serde(rename_all = "camelCase")]
32pub struct PairProfile {
33 pub agent_name: String,
34 pub human_name: String,
35 #[serde(skip_serializing_if = "Option::is_none")]
36 pub proxy_origin: Option<String>,
37}
38
39#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
40#[serde(rename_all = "camelCase")]
41pub struct PairStartResult {
42 pub initiator_agent_did: String,
43 pub initiator_profile: PairProfile,
44 pub ticket: String,
45 pub expires_at: String,
46 pub proxy_url: String,
47 #[serde(skip_serializing_if = "Option::is_none")]
48 pub qr_path: Option<PathBuf>,
49}
50
51#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
52#[serde(rename_all = "camelCase")]
53pub struct PairConfirmResult {
54 pub paired: bool,
55 pub initiator_agent_did: String,
56 pub initiator_profile: PairProfile,
57 pub responder_agent_did: String,
58 pub responder_profile: PairProfile,
59 pub proxy_url: String,
60 #[serde(skip_serializing_if = "Option::is_none")]
61 pub peer_alias: Option<String>,
62}
63
64#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
65#[serde(rename_all = "camelCase")]
66pub enum PairStatusKind {
67 Pending,
68 Confirmed,
69}
70
71#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
72#[serde(rename_all = "camelCase")]
73pub struct PairStatusResult {
74 pub status: PairStatusKind,
75 pub initiator_agent_did: String,
76 pub initiator_profile: PairProfile,
77 #[serde(skip_serializing_if = "Option::is_none")]
78 pub responder_agent_did: Option<String>,
79 #[serde(skip_serializing_if = "Option::is_none")]
80 pub responder_profile: Option<PairProfile>,
81 pub expires_at: String,
82 #[serde(skip_serializing_if = "Option::is_none")]
83 pub confirmed_at: Option<String>,
84 pub proxy_url: String,
85 #[serde(skip_serializing_if = "Option::is_none")]
86 pub peer_alias: Option<String>,
87}
88
89#[derive(Debug, Clone)]
90pub enum PairConfirmInput {
91 Ticket(String),
92 QrFile(PathBuf),
93}
94
95#[derive(Debug, Clone)]
96pub struct PairStatusOptions {
97 pub wait: bool,
98 pub wait_seconds: u64,
99 pub poll_interval_seconds: u64,
100}
101
102impl Default for PairStatusOptions {
103 fn default() -> Self {
104 Self {
105 wait: false,
106 wait_seconds: DEFAULT_STATUS_WAIT_SECONDS,
107 poll_interval_seconds: DEFAULT_STATUS_POLL_INTERVAL_SECONDS,
108 }
109 }
110}
111
112#[derive(Debug, Clone)]
113struct LocalAgentProofMaterial {
114 ait: String,
115 secret_key: ed25519_dalek::SigningKey,
116 agent_did: String,
117}
118
119#[derive(Debug, Deserialize)]
120#[serde(rename_all = "camelCase")]
121struct PairStartResponsePayload {
122 ticket: String,
123 initiator_agent_did: String,
124 initiator_profile: PairProfile,
125 expires_at: String,
126}
127
128#[derive(Debug, Deserialize)]
129#[serde(rename_all = "camelCase")]
130struct PairConfirmResponsePayload {
131 paired: bool,
132 initiator_agent_did: String,
133 initiator_profile: PairProfile,
134 responder_agent_did: String,
135 responder_profile: PairProfile,
136}
137
138#[derive(Debug, Deserialize)]
139#[serde(rename_all = "camelCase")]
140struct PairStatusResponsePayload {
141 status: String,
142 initiator_agent_did: String,
143 initiator_profile: PairProfile,
144 responder_agent_did: Option<String>,
145 responder_profile: Option<PairProfile>,
146 expires_at: String,
147 confirmed_at: Option<String>,
148}
149
150fn parse_non_empty(value: &str, field: &str) -> Result<String> {
151 let trimmed = value.trim();
152 if trimmed.is_empty() {
153 return Err(CoreError::InvalidInput(format!("{field} is required")));
154 }
155 Ok(trimmed.to_string())
156}
157
158fn parse_proxy_url(candidate: &str) -> Result<String> {
159 let parsed = url::Url::parse(candidate)
160 .map_err(|_| CoreError::InvalidInput("proxyUrl is invalid".to_string()))?;
161 if parsed.scheme() != "https" && parsed.scheme() != "http" {
162 return Err(CoreError::InvalidInput("proxyUrl is invalid".to_string()));
163 }
164 Ok(parsed.to_string())
165}
166
167fn parse_pair_profile(profile: &PairProfile) -> Result<PairProfile> {
168 Ok(PairProfile {
169 agent_name: parse_non_empty(&profile.agent_name, "agentName")?,
170 human_name: parse_non_empty(&profile.human_name, "humanName")?,
171 proxy_origin: profile
172 .proxy_origin
173 .as_deref()
174 .map(|value| value.trim().to_string())
175 .filter(|value| !value.is_empty()),
176 })
177}
178
179pub fn parse_pairing_ticket(value: &str) -> Result<String> {
181 let mut ticket = value.trim().trim_matches('`').to_string();
182 ticket.retain(|character| !character.is_whitespace());
183 if !ticket.starts_with(PAIRING_TICKET_PREFIX) {
184 return Err(CoreError::InvalidInput(
185 "pairing ticket is invalid".to_string(),
186 ));
187 }
188
189 let encoded_payload = &ticket[PAIRING_TICKET_PREFIX.len()..];
190 if encoded_payload.is_empty() {
191 return Err(CoreError::InvalidInput(
192 "pairing ticket is invalid".to_string(),
193 ));
194 }
195
196 let payload_raw = URL_SAFE_NO_PAD
197 .decode(encoded_payload)
198 .map_err(|_| CoreError::InvalidInput("pairing ticket is invalid".to_string()))?;
199 let payload_json = std::str::from_utf8(&payload_raw)
200 .map_err(|_| CoreError::InvalidInput("pairing ticket is invalid".to_string()))?;
201 let _: serde_json::Value = serde_json::from_str(payload_json)
202 .map_err(|_| CoreError::InvalidInput("pairing ticket is invalid".to_string()))?;
203
204 Ok(ticket)
205}
206
207pub fn parse_pairing_ticket_issuer_origin(ticket: &str) -> Result<String> {
209 let ticket = parse_pairing_ticket(ticket)?;
210 let encoded_payload = &ticket[PAIRING_TICKET_PREFIX.len()..];
211 let payload_raw = URL_SAFE_NO_PAD
212 .decode(encoded_payload)
213 .map_err(|_| CoreError::InvalidInput("pairing ticket is invalid".to_string()))?;
214 let payload: serde_json::Value = serde_json::from_slice(&payload_raw)
215 .map_err(|_| CoreError::InvalidInput("pairing ticket is invalid".to_string()))?;
216 let issuer = payload
217 .get("iss")
218 .and_then(|value| value.as_str())
219 .ok_or_else(|| CoreError::InvalidInput("pairing ticket is invalid".to_string()))?;
220 let issuer_url = url::Url::parse(issuer)
221 .map_err(|_| CoreError::InvalidInput("pairing ticket is invalid".to_string()))?;
222 if issuer_url.scheme() != "https" && issuer_url.scheme() != "http" {
223 return Err(CoreError::InvalidInput(
224 "pairing ticket is invalid".to_string(),
225 ));
226 }
227 Ok(issuer_url.origin().unicode_serialization())
228}
229
230pub fn assert_ticket_issuer_matches_proxy(ticket: &str, proxy_url: &str) -> Result<()> {
232 let issuer_origin = parse_pairing_ticket_issuer_origin(ticket)?;
233 let proxy_origin = url::Url::parse(proxy_url)
234 .map_err(|_| CoreError::InvalidInput("proxyUrl is invalid".to_string()))?
235 .origin()
236 .unicode_serialization();
237 if issuer_origin != proxy_origin {
238 return Err(CoreError::InvalidInput(format!(
239 "pairing ticket issuer {issuer_origin} does not match proxy origin {proxy_origin}"
240 )));
241 }
242 Ok(())
243}
244
245fn read_local_agent_proof_material(
246 config_dir: &Path,
247 agent_name: &str,
248) -> Result<LocalAgentProofMaterial> {
249 let normalized_agent_name = parse_non_empty(agent_name, "agentName")?;
250 let agent_dir = config_dir.join(AGENTS_DIR).join(normalized_agent_name);
251 let ait_path = agent_dir.join(AIT_FILE_NAME);
252 let secret_key_path = agent_dir.join(SECRET_KEY_FILE_NAME);
253
254 let ait = fs::read_to_string(&ait_path).map_err(|source| CoreError::Io {
255 path: ait_path.clone(),
256 source,
257 })?;
258 let ait = parse_non_empty(&ait, AIT_FILE_NAME)?;
259 let secret_key_raw = fs::read_to_string(&secret_key_path).map_err(|source| CoreError::Io {
260 path: secret_key_path.clone(),
261 source,
262 })?;
263 let secret_key = decode_secret_key(secret_key_raw.trim())?;
264 let agent_did = parse_ait_agent_did(&ait)?;
265
266 Ok(LocalAgentProofMaterial {
267 ait,
268 secret_key,
269 agent_did,
270 })
271}
272
273fn parse_ait_agent_did(ait: &str) -> Result<String> {
274 let parts: Vec<&str> = ait.split('.').collect();
275 if parts.len() < 2 {
276 return Err(CoreError::InvalidInput("agent AIT is invalid".to_string()));
277 }
278 let payload = URL_SAFE_NO_PAD
279 .decode(parts[1])
280 .map_err(|_| CoreError::InvalidInput("agent AIT is invalid".to_string()))?;
281 let value: serde_json::Value = serde_json::from_slice(&payload)
282 .map_err(|_| CoreError::InvalidInput("agent AIT is invalid".to_string()))?;
283 let sub = value
284 .get("sub")
285 .and_then(|entry| entry.as_str())
286 .ok_or_else(|| CoreError::InvalidInput("agent AIT is invalid".to_string()))?;
287 parse_agent_did(sub)
288 .map_err(|_| CoreError::InvalidInput("agent AIT is invalid".to_string()))?;
289 Ok(sub.to_string())
290}
291
292fn to_request_url(proxy_url: &str, path: &str) -> Result<String> {
293 let normalized_proxy = parse_proxy_url(proxy_url)?;
294 let base = if normalized_proxy.ends_with('/') {
295 normalized_proxy
296 } else {
297 format!("{normalized_proxy}/")
298 };
299 let joined = url::Url::parse(&base)
300 .map_err(|_| CoreError::InvalidInput("proxyUrl is invalid".to_string()))?
301 .join(path.trim_start_matches('/'))
302 .map_err(|_| CoreError::InvalidInput("proxyUrl is invalid".to_string()))?;
303 Ok(joined.to_string())
304}
305
306fn to_path_with_query(url: &str) -> Result<String> {
307 let parsed = url::Url::parse(url)
308 .map_err(|_| CoreError::InvalidInput("requestUrl is invalid".to_string()))?;
309 Ok(match parsed.query() {
310 Some(query) => format!("{}?{query}", parsed.path()),
311 None => parsed.path().to_string(),
312 })
313}
314
315fn build_signed_headers(
316 method: &str,
317 request_url: &str,
318 body_bytes: &[u8],
319 secret_key: &ed25519_dalek::SigningKey,
320) -> Result<Vec<(String, String)>> {
321 let mut nonce_bytes = [0_u8; 24];
322 getrandom_fill(&mut nonce_bytes).map_err(|error| CoreError::InvalidInput(error.to_string()))?;
323 let nonce = URL_SAFE_NO_PAD.encode(nonce_bytes);
324 let timestamp = format!("{}", chrono::Utc::now().timestamp());
325 let signed = sign_http_request(&SignHttpRequestInput {
326 method,
327 path_with_query: &to_path_with_query(request_url)?,
328 timestamp: ×tamp,
329 nonce: &nonce,
330 body: body_bytes,
331 secret_key,
332 })?;
333 Ok(signed.headers)
334}
335
336fn parse_registry_message(payload: &serde_json::Value, fallback: &str) -> String {
337 payload
338 .get("error")
339 .and_then(|value| value.get("message"))
340 .and_then(|value| value.as_str())
341 .map(|value| value.to_string())
342 .unwrap_or_else(|| fallback.to_string())
343}
344
345fn execute_pair_request(
346 request_url: &str,
347 ait: &str,
348 body: serde_json::Value,
349 secret_key: &ed25519_dalek::SigningKey,
350) -> Result<serde_json::Value> {
351 let request_body = serde_json::to_string(&body)?;
352 let body_bytes = request_body.as_bytes();
353 let signed_headers = build_signed_headers("POST", request_url, body_bytes, secret_key)?;
354 tracing::debug!(request_url, "pair request sending");
355
356 let mut request = blocking_client()?
357 .post(request_url.to_string())
358 .header("authorization", format!("Claw {}", ait.trim()))
359 .header("content-type", "application/json");
360 for (header_name, header_value) in signed_headers {
361 request = request.header(header_name, header_value);
362 }
363
364 let response = request
365 .body(request_body)
366 .send()
367 .map_err(|error| CoreError::Http(error.to_string()))?;
368 let status = response.status().as_u16();
369 tracing::debug!(request_url, status, "pair request completed");
370 let payload: serde_json::Value = response
371 .json()
372 .map_err(|error| CoreError::Http(error.to_string()))?;
373 if status >= 400 {
374 return Err(CoreError::HttpStatus {
375 status,
376 message: parse_registry_message(&payload, "pairing request failed"),
377 });
378 }
379 Ok(payload)
380}
381
382fn resolve_peer_proxy_url(
383 ticket: &str,
384 profile: &PairProfile,
385 peer_proxy_origin: Option<String>,
386) -> Result<String> {
387 let resolved_origin = peer_proxy_origin
388 .and_then(|value| {
389 let trimmed = value.trim().to_string();
390 if trimmed.is_empty() {
391 None
392 } else {
393 Some(trimmed)
394 }
395 })
396 .or_else(|| profile.proxy_origin.clone())
397 .unwrap_or(parse_pairing_ticket_issuer_origin(ticket)?);
398 let origin = parse_proxy_url(&resolved_origin)?;
399 to_request_url(&origin, "/hooks/agent")
400}
401
402fn persist_confirmed_peer(
403 store: &SqliteStore,
404 config_dir: &Path,
405 ticket: &str,
406 peer_did: &str,
407 peer_profile: &PairProfile,
408 peer_proxy_origin: Option<String>,
409) -> Result<String> {
410 let peer_proxy_url = resolve_peer_proxy_url(ticket, peer_profile, peer_proxy_origin)?;
411 let record = persist_peer(
412 store,
413 PersistPeerInput {
414 alias: None,
415 did: peer_did.to_string(),
416 proxy_url: peer_proxy_url,
417 agent_name: Some(peer_profile.agent_name.clone()),
418 human_name: Some(peer_profile.human_name.clone()),
419 },
420 )?;
421 let peers_config = load_peers_config(store)?;
422 sync_openclaw_relay_peers_snapshot(config_dir, &peers_config)?;
423 Ok(record.alias)
424}
425
426pub fn start_pairing(
428 config_dir: &Path,
429 agent_name: &str,
430 proxy_url: &str,
431 initiator_profile: PairProfile,
432 ttl_seconds: Option<u64>,
433) -> Result<PairStartResult> {
434 let proof = read_local_agent_proof_material(config_dir, agent_name)?;
435 let request_url = to_request_url(proxy_url, PAIR_START_PATH)?;
436 let mut request_body = serde_json::Map::new();
437 request_body.insert(
438 "initiatorProfile".to_string(),
439 serde_json::to_value(parse_pair_profile(&initiator_profile)?)?,
440 );
441 if let Some(ttl_seconds) = ttl_seconds {
442 request_body.insert(
443 "ttlSeconds".to_string(),
444 serde_json::Value::Number(ttl_seconds.into()),
445 );
446 }
447 let payload = execute_pair_request(
448 &request_url,
449 &proof.ait,
450 serde_json::Value::Object(request_body),
451 &proof.secret_key,
452 )?;
453 let parsed: PairStartResponsePayload = serde_json::from_value(payload)
454 .map_err(|error| CoreError::InvalidInput(error.to_string()))?;
455 Ok(PairStartResult {
456 initiator_agent_did: parse_non_empty(&parsed.initiator_agent_did, "initiatorAgentDid")?,
457 initiator_profile: parse_pair_profile(&parsed.initiator_profile)?,
458 ticket: parse_pairing_ticket(&parsed.ticket)?,
459 expires_at: parse_non_empty(&parsed.expires_at, "expiresAt")?,
460 proxy_url: parse_proxy_url(proxy_url)?,
461 qr_path: None,
462 })
463}
464
465#[allow(clippy::too_many_lines)]
467pub fn confirm_pairing(
468 config_dir: &Path,
469 store: &SqliteStore,
470 agent_name: &str,
471 confirm_input: PairConfirmInput,
472 responder_profile: PairProfile,
473) -> Result<PairConfirmResult> {
474 let ticket = match confirm_input {
475 PairConfirmInput::Ticket(ticket) => parse_pairing_ticket(&ticket)?,
476 PairConfirmInput::QrFile(path) => {
477 let image = fs::read(&path).map_err(|source| CoreError::Io { path, source })?;
478 parse_pairing_ticket(&decode_ticket_from_png(&image)?)?
479 }
480 };
481
482 let proof = read_local_agent_proof_material(config_dir, agent_name)?;
483 let proxy_url = parse_pairing_ticket_issuer_origin(&ticket)?;
484 let request_url = to_request_url(&proxy_url, PAIR_CONFIRM_PATH)?;
485 let payload = execute_pair_request(
486 &request_url,
487 &proof.ait,
488 serde_json::json!({
489 "ticket": ticket,
490 "responderProfile": parse_pair_profile(&responder_profile)?,
491 }),
492 &proof.secret_key,
493 )?;
494
495 let parsed: PairConfirmResponsePayload = serde_json::from_value(payload)
496 .map_err(|error| CoreError::InvalidInput(error.to_string()))?;
497 if !parsed.paired {
498 return Err(CoreError::InvalidInput(
499 "pair confirm response is invalid".to_string(),
500 ));
501 }
502
503 let peer_alias = persist_confirmed_peer(
504 store,
505 config_dir,
506 &ticket,
507 &parsed.initiator_agent_did,
508 &parsed.initiator_profile,
509 parsed.initiator_profile.proxy_origin.clone(),
510 )?;
511
512 Ok(PairConfirmResult {
513 paired: true,
514 initiator_agent_did: parse_non_empty(&parsed.initiator_agent_did, "initiatorAgentDid")?,
515 initiator_profile: parse_pair_profile(&parsed.initiator_profile)?,
516 responder_agent_did: parse_non_empty(&parsed.responder_agent_did, "responderAgentDid")?,
517 responder_profile: parse_pair_profile(&parsed.responder_profile)?,
518 proxy_url,
519 peer_alias: Some(peer_alias),
520 })
521}
522
523#[allow(clippy::too_many_lines)]
524fn get_pairing_status_once(
525 config_dir: &Path,
526 store: &SqliteStore,
527 agent_name: &str,
528 proxy_url: &str,
529 ticket: &str,
530) -> Result<PairStatusResult> {
531 let ticket = parse_pairing_ticket(ticket)?;
532 assert_ticket_issuer_matches_proxy(&ticket, proxy_url)?;
533 let proof = read_local_agent_proof_material(config_dir, agent_name)?;
534 let request_url = to_request_url(proxy_url, PAIR_STATUS_PATH)?;
535 let payload = execute_pair_request(
536 &request_url,
537 &proof.ait,
538 serde_json::json!({ "ticket": ticket }),
539 &proof.secret_key,
540 )?;
541
542 let parsed: PairStatusResponsePayload = serde_json::from_value(payload)
543 .map_err(|error| CoreError::InvalidInput(error.to_string()))?;
544 let status = match parsed.status.as_str() {
545 "pending" => PairStatusKind::Pending,
546 "confirmed" => PairStatusKind::Confirmed,
547 _ => {
548 return Err(CoreError::InvalidInput(
549 "pair status response is invalid".to_string(),
550 ));
551 }
552 };
553
554 let mut peer_alias = None;
555 if status == PairStatusKind::Confirmed {
556 let responder_agent_did = parsed.responder_agent_did.clone().ok_or_else(|| {
557 CoreError::InvalidInput("pair status response is invalid".to_string())
558 })?;
559 let responder_profile = parsed.responder_profile.clone().ok_or_else(|| {
560 CoreError::InvalidInput("pair status response is invalid".to_string())
561 })?;
562
563 let (peer_did, peer_profile) = if proof.agent_did == parsed.initiator_agent_did {
564 (responder_agent_did, responder_profile)
565 } else if proof.agent_did == responder_agent_did {
566 (
567 parsed.initiator_agent_did.clone(),
568 parsed.initiator_profile.clone(),
569 )
570 } else {
571 return Err(CoreError::InvalidInput(
572 "local agent is not a pairing participant".to_string(),
573 ));
574 };
575
576 peer_alias = Some(persist_confirmed_peer(
577 store,
578 config_dir,
579 &ticket,
580 &peer_did,
581 &peer_profile,
582 peer_profile.proxy_origin.clone(),
583 )?);
584 }
585
586 Ok(PairStatusResult {
587 status,
588 initiator_agent_did: parsed.initiator_agent_did,
589 initiator_profile: parsed.initiator_profile,
590 responder_agent_did: parsed.responder_agent_did,
591 responder_profile: parsed.responder_profile,
592 expires_at: parsed.expires_at,
593 confirmed_at: parsed.confirmed_at,
594 proxy_url: parse_proxy_url(proxy_url)?,
595 peer_alias,
596 })
597}
598
599pub fn get_pairing_status(
601 config_dir: &Path,
602 store: &SqliteStore,
603 agent_name: &str,
604 proxy_url: &str,
605 ticket: &str,
606 options: PairStatusOptions,
607) -> Result<PairStatusResult> {
608 if !options.wait {
609 return get_pairing_status_once(config_dir, store, agent_name, proxy_url, ticket);
610 }
611
612 let wait_seconds = if options.wait_seconds == 0 {
613 DEFAULT_STATUS_WAIT_SECONDS
614 } else {
615 options.wait_seconds
616 };
617 let poll_interval_seconds = if options.poll_interval_seconds == 0 {
618 DEFAULT_STATUS_POLL_INTERVAL_SECONDS
619 } else {
620 options.poll_interval_seconds
621 };
622 let deadline = chrono::Utc::now().timestamp() + wait_seconds as i64;
623 loop {
624 let status = get_pairing_status_once(config_dir, store, agent_name, proxy_url, ticket)?;
625 if status.status == PairStatusKind::Confirmed {
626 return Ok(status);
627 }
628
629 if chrono::Utc::now().timestamp() >= deadline {
630 return Err(CoreError::InvalidInput(format!(
631 "pairing is still pending after {wait_seconds} seconds"
632 )));
633 }
634 std::thread::sleep(Duration::from_secs(poll_interval_seconds));
635 }
636}
637
638#[cfg(test)]
639#[path = "pairing_tests.rs"]
640mod tests;