1use std::fs;
2use std::path::{Path, PathBuf};
3use std::time::Duration;
4
5use base64::Engine;
6use base64::engine::general_purpose::URL_SAFE_NO_PAD;
7use getrandom::fill as getrandom_fill;
8use serde::{Deserialize, Serialize};
9
10use crate::constants::{AGENTS_DIR, AIT_FILE_NAME, SECRET_KEY_FILE_NAME};
11use crate::db::SqliteStore;
12use crate::did::parse_agent_did;
13use crate::error::{CoreError, Result};
14use crate::http::blocking_client;
15use crate::identity::decode_secret_key;
16use crate::peers::{
17 PersistPeerInput, load_peers_config, persist_peer, sync_openclaw_relay_peers_snapshot,
18};
19use crate::qr::decode_ticket_from_png;
20use crate::signing::{SignHttpRequestInput, sign_http_request};
21
22pub const PAIR_START_PATH: &str = "/pair/start";
23pub const PAIR_CONFIRM_PATH: &str = "/pair/confirm";
24pub const PAIR_STATUS_PATH: &str = "/pair/status";
25pub const PAIRING_TICKET_PREFIX: &str = "clwpair1_";
26
27pub const DEFAULT_STATUS_WAIT_SECONDS: u64 = 300;
28pub const DEFAULT_STATUS_POLL_INTERVAL_SECONDS: u64 = 3;
29
30#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
31#[serde(rename_all = "camelCase")]
32pub struct PairProfile {
33 pub agent_name: String,
34 pub human_name: String,
35 #[serde(skip_serializing_if = "Option::is_none")]
36 pub proxy_origin: Option<String>,
37}
38
39#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
40#[serde(rename_all = "camelCase")]
41pub struct PairStartResult {
42 pub initiator_agent_did: String,
43 pub initiator_profile: PairProfile,
44 pub ticket: String,
45 pub expires_at: String,
46 pub proxy_url: String,
47 #[serde(skip_serializing_if = "Option::is_none")]
48 pub qr_path: Option<PathBuf>,
49}
50
51#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
52#[serde(rename_all = "camelCase")]
53pub struct PairConfirmResult {
54 pub paired: bool,
55 pub initiator_agent_did: String,
56 pub initiator_profile: PairProfile,
57 pub responder_agent_did: String,
58 pub responder_profile: PairProfile,
59 pub proxy_url: String,
60 #[serde(skip_serializing_if = "Option::is_none")]
61 pub peer_alias: Option<String>,
62}
63
64#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
65#[serde(rename_all = "camelCase")]
66pub enum PairStatusKind {
67 Pending,
68 Confirmed,
69}
70
71#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
72#[serde(rename_all = "camelCase")]
73pub struct PairStatusResult {
74 pub status: PairStatusKind,
75 pub initiator_agent_did: String,
76 pub initiator_profile: PairProfile,
77 #[serde(skip_serializing_if = "Option::is_none")]
78 pub responder_agent_did: Option<String>,
79 #[serde(skip_serializing_if = "Option::is_none")]
80 pub responder_profile: Option<PairProfile>,
81 pub expires_at: String,
82 #[serde(skip_serializing_if = "Option::is_none")]
83 pub confirmed_at: Option<String>,
84 pub proxy_url: String,
85 #[serde(skip_serializing_if = "Option::is_none")]
86 pub peer_alias: Option<String>,
87}
88
89#[derive(Debug, Clone)]
90pub enum PairConfirmInput {
91 Ticket(String),
92 QrFile(PathBuf),
93}
94
95#[derive(Debug, Clone)]
96pub struct PairStatusOptions {
97 pub wait: bool,
98 pub wait_seconds: u64,
99 pub poll_interval_seconds: u64,
100}
101
102impl Default for PairStatusOptions {
103 fn default() -> Self {
104 Self {
105 wait: false,
106 wait_seconds: DEFAULT_STATUS_WAIT_SECONDS,
107 poll_interval_seconds: DEFAULT_STATUS_POLL_INTERVAL_SECONDS,
108 }
109 }
110}
111
112#[derive(Debug, Clone)]
113struct LocalAgentProofMaterial {
114 ait: String,
115 secret_key: ed25519_dalek::SigningKey,
116 agent_did: String,
117}
118
119#[derive(Debug, Deserialize)]
120#[serde(rename_all = "camelCase")]
121struct PairStartResponsePayload {
122 ticket: String,
123 initiator_agent_did: String,
124 initiator_profile: PairProfile,
125 expires_at: String,
126}
127
128#[derive(Debug, Deserialize)]
129#[serde(rename_all = "camelCase")]
130struct PairConfirmResponsePayload {
131 paired: bool,
132 initiator_agent_did: String,
133 initiator_profile: PairProfile,
134 responder_agent_did: String,
135 responder_profile: PairProfile,
136}
137
138#[derive(Debug, Deserialize)]
139#[serde(rename_all = "camelCase")]
140struct PairStatusResponsePayload {
141 status: String,
142 initiator_agent_did: String,
143 initiator_profile: PairProfile,
144 responder_agent_did: Option<String>,
145 responder_profile: Option<PairProfile>,
146 expires_at: String,
147 confirmed_at: Option<String>,
148}
149
150fn parse_non_empty(value: &str, field: &str) -> Result<String> {
151 let trimmed = value.trim();
152 if trimmed.is_empty() {
153 return Err(CoreError::InvalidInput(format!("{field} is required")));
154 }
155 Ok(trimmed.to_string())
156}
157
158fn parse_proxy_url(candidate: &str) -> Result<String> {
159 let parsed = url::Url::parse(candidate)
160 .map_err(|_| CoreError::InvalidInput("proxyUrl is invalid".to_string()))?;
161 if parsed.scheme() != "https" && parsed.scheme() != "http" {
162 return Err(CoreError::InvalidInput("proxyUrl is invalid".to_string()));
163 }
164 Ok(parsed.to_string())
165}
166
167fn parse_pair_profile(profile: &PairProfile) -> Result<PairProfile> {
168 Ok(PairProfile {
169 agent_name: parse_non_empty(&profile.agent_name, "agentName")?,
170 human_name: parse_non_empty(&profile.human_name, "humanName")?,
171 proxy_origin: profile
172 .proxy_origin
173 .as_deref()
174 .map(|value| value.trim().to_string())
175 .filter(|value| !value.is_empty()),
176 })
177}
178
179pub fn parse_pairing_ticket(value: &str) -> Result<String> {
181 let mut ticket = value.trim().trim_matches('`').to_string();
182 ticket.retain(|character| !character.is_whitespace());
183 if !ticket.starts_with(PAIRING_TICKET_PREFIX) {
184 return Err(CoreError::InvalidInput(
185 "pairing ticket is invalid".to_string(),
186 ));
187 }
188
189 let encoded_payload = &ticket[PAIRING_TICKET_PREFIX.len()..];
190 if encoded_payload.is_empty() {
191 return Err(CoreError::InvalidInput(
192 "pairing ticket is invalid".to_string(),
193 ));
194 }
195
196 let payload_raw = URL_SAFE_NO_PAD
197 .decode(encoded_payload)
198 .map_err(|_| CoreError::InvalidInput("pairing ticket is invalid".to_string()))?;
199 let payload_json = std::str::from_utf8(&payload_raw)
200 .map_err(|_| CoreError::InvalidInput("pairing ticket is invalid".to_string()))?;
201 let _: serde_json::Value = serde_json::from_str(payload_json)
202 .map_err(|_| CoreError::InvalidInput("pairing ticket is invalid".to_string()))?;
203
204 Ok(ticket)
205}
206
207pub fn parse_pairing_ticket_issuer_origin(ticket: &str) -> Result<String> {
209 let ticket = parse_pairing_ticket(ticket)?;
210 let encoded_payload = &ticket[PAIRING_TICKET_PREFIX.len()..];
211 let payload_raw = URL_SAFE_NO_PAD
212 .decode(encoded_payload)
213 .map_err(|_| CoreError::InvalidInput("pairing ticket is invalid".to_string()))?;
214 let payload: serde_json::Value = serde_json::from_slice(&payload_raw)
215 .map_err(|_| CoreError::InvalidInput("pairing ticket is invalid".to_string()))?;
216 let issuer = payload
217 .get("iss")
218 .and_then(|value| value.as_str())
219 .ok_or_else(|| CoreError::InvalidInput("pairing ticket is invalid".to_string()))?;
220 let issuer_url = url::Url::parse(issuer)
221 .map_err(|_| CoreError::InvalidInput("pairing ticket is invalid".to_string()))?;
222 if issuer_url.scheme() != "https" && issuer_url.scheme() != "http" {
223 return Err(CoreError::InvalidInput(
224 "pairing ticket is invalid".to_string(),
225 ));
226 }
227 Ok(issuer_url.origin().unicode_serialization())
228}
229
230pub fn assert_ticket_issuer_matches_proxy(ticket: &str, proxy_url: &str) -> Result<()> {
232 let issuer_origin = parse_pairing_ticket_issuer_origin(ticket)?;
233 let proxy_origin = url::Url::parse(proxy_url)
234 .map_err(|_| CoreError::InvalidInput("proxyUrl is invalid".to_string()))?
235 .origin()
236 .unicode_serialization();
237 if issuer_origin != proxy_origin {
238 return Err(CoreError::InvalidInput(format!(
239 "pairing ticket issuer {issuer_origin} does not match proxy origin {proxy_origin}"
240 )));
241 }
242 Ok(())
243}
244
245fn read_local_agent_proof_material(
246 config_dir: &Path,
247 agent_name: &str,
248) -> Result<LocalAgentProofMaterial> {
249 let normalized_agent_name = parse_non_empty(agent_name, "agentName")?;
250 let agent_dir = config_dir.join(AGENTS_DIR).join(normalized_agent_name);
251 let ait_path = agent_dir.join(AIT_FILE_NAME);
252 let secret_key_path = agent_dir.join(SECRET_KEY_FILE_NAME);
253
254 let ait = fs::read_to_string(&ait_path).map_err(|source| CoreError::Io {
255 path: ait_path.clone(),
256 source,
257 })?;
258 let ait = parse_non_empty(&ait, AIT_FILE_NAME)?;
259 let secret_key_raw = fs::read_to_string(&secret_key_path).map_err(|source| CoreError::Io {
260 path: secret_key_path.clone(),
261 source,
262 })?;
263 let secret_key = decode_secret_key(secret_key_raw.trim())?;
264 let agent_did = parse_ait_agent_did(&ait)?;
265
266 Ok(LocalAgentProofMaterial {
267 ait,
268 secret_key,
269 agent_did,
270 })
271}
272
273fn parse_ait_agent_did(ait: &str) -> Result<String> {
274 let parts: Vec<&str> = ait.split('.').collect();
275 if parts.len() < 2 {
276 return Err(CoreError::InvalidInput("agent AIT is invalid".to_string()));
277 }
278 let payload = URL_SAFE_NO_PAD
279 .decode(parts[1])
280 .map_err(|_| CoreError::InvalidInput("agent AIT is invalid".to_string()))?;
281 let value: serde_json::Value = serde_json::from_slice(&payload)
282 .map_err(|_| CoreError::InvalidInput("agent AIT is invalid".to_string()))?;
283 let sub = value
284 .get("sub")
285 .and_then(|entry| entry.as_str())
286 .ok_or_else(|| CoreError::InvalidInput("agent AIT is invalid".to_string()))?;
287 parse_agent_did(sub)
288 .map_err(|_| CoreError::InvalidInput("agent AIT is invalid".to_string()))?;
289 Ok(sub.to_string())
290}
291
292fn to_request_url(proxy_url: &str, path: &str) -> Result<String> {
293 let normalized_proxy = parse_proxy_url(proxy_url)?;
294 let base = if normalized_proxy.ends_with('/') {
295 normalized_proxy
296 } else {
297 format!("{normalized_proxy}/")
298 };
299 let joined = url::Url::parse(&base)
300 .map_err(|_| CoreError::InvalidInput("proxyUrl is invalid".to_string()))?
301 .join(path.trim_start_matches('/'))
302 .map_err(|_| CoreError::InvalidInput("proxyUrl is invalid".to_string()))?;
303 Ok(joined.to_string())
304}
305
306fn to_path_with_query(url: &str) -> Result<String> {
307 let parsed = url::Url::parse(url)
308 .map_err(|_| CoreError::InvalidInput("requestUrl is invalid".to_string()))?;
309 Ok(match parsed.query() {
310 Some(query) => format!("{}?{query}", parsed.path()),
311 None => parsed.path().to_string(),
312 })
313}
314
315fn build_signed_headers(
316 method: &str,
317 request_url: &str,
318 body_bytes: &[u8],
319 secret_key: &ed25519_dalek::SigningKey,
320) -> Result<Vec<(String, String)>> {
321 let mut nonce_bytes = [0_u8; 24];
322 getrandom_fill(&mut nonce_bytes).map_err(|error| CoreError::InvalidInput(error.to_string()))?;
323 let nonce = URL_SAFE_NO_PAD.encode(nonce_bytes);
324 let timestamp = format!("{}", chrono::Utc::now().timestamp());
325 let signed = sign_http_request(&SignHttpRequestInput {
326 method,
327 path_with_query: &to_path_with_query(request_url)?,
328 timestamp: ×tamp,
329 nonce: &nonce,
330 body: body_bytes,
331 secret_key,
332 })?;
333 Ok(signed.headers)
334}
335
336fn parse_registry_message(payload: &serde_json::Value, fallback: &str) -> String {
337 payload
338 .get("error")
339 .and_then(|value| value.get("message"))
340 .and_then(|value| value.as_str())
341 .map(|value| value.to_string())
342 .unwrap_or_else(|| fallback.to_string())
343}
344
345fn execute_pair_request(
346 request_url: &str,
347 ait: &str,
348 body: serde_json::Value,
349 secret_key: &ed25519_dalek::SigningKey,
350) -> Result<serde_json::Value> {
351 let request_body = serde_json::to_string(&body)?;
352 let body_bytes = request_body.as_bytes();
353 let signed_headers = build_signed_headers("POST", request_url, body_bytes, secret_key)?;
354
355 let mut request = blocking_client()?
356 .post(request_url.to_string())
357 .header("authorization", format!("Claw {}", ait.trim()))
358 .header("content-type", "application/json");
359 for (header_name, header_value) in signed_headers {
360 request = request.header(header_name, header_value);
361 }
362
363 let response = request
364 .body(request_body)
365 .send()
366 .map_err(|error| CoreError::Http(error.to_string()))?;
367 let status = response.status().as_u16();
368 let payload: serde_json::Value = response
369 .json()
370 .map_err(|error| CoreError::Http(error.to_string()))?;
371 if status >= 400 {
372 return Err(CoreError::HttpStatus {
373 status,
374 message: parse_registry_message(&payload, "pairing request failed"),
375 });
376 }
377 Ok(payload)
378}
379
380fn resolve_peer_proxy_url(
381 ticket: &str,
382 profile: &PairProfile,
383 peer_proxy_origin: Option<String>,
384) -> Result<String> {
385 let resolved_origin = peer_proxy_origin
386 .and_then(|value| {
387 let trimmed = value.trim().to_string();
388 if trimmed.is_empty() {
389 None
390 } else {
391 Some(trimmed)
392 }
393 })
394 .or_else(|| profile.proxy_origin.clone())
395 .unwrap_or(parse_pairing_ticket_issuer_origin(ticket)?);
396 let origin = parse_proxy_url(&resolved_origin)?;
397 to_request_url(&origin, "/hooks/agent")
398}
399
400fn persist_confirmed_peer(
401 store: &SqliteStore,
402 config_dir: &Path,
403 ticket: &str,
404 peer_did: &str,
405 peer_profile: &PairProfile,
406 peer_proxy_origin: Option<String>,
407) -> Result<String> {
408 let peer_proxy_url = resolve_peer_proxy_url(ticket, peer_profile, peer_proxy_origin)?;
409 let record = persist_peer(
410 store,
411 PersistPeerInput {
412 alias: None,
413 did: peer_did.to_string(),
414 proxy_url: peer_proxy_url,
415 agent_name: Some(peer_profile.agent_name.clone()),
416 human_name: Some(peer_profile.human_name.clone()),
417 },
418 )?;
419 let peers_config = load_peers_config(store)?;
420 sync_openclaw_relay_peers_snapshot(config_dir, &peers_config)?;
421 Ok(record.alias)
422}
423
424pub fn start_pairing(
426 config_dir: &Path,
427 agent_name: &str,
428 proxy_url: &str,
429 initiator_profile: PairProfile,
430 ttl_seconds: Option<u64>,
431) -> Result<PairStartResult> {
432 let proof = read_local_agent_proof_material(config_dir, agent_name)?;
433 let request_url = to_request_url(proxy_url, PAIR_START_PATH)?;
434 let payload = execute_pair_request(
435 &request_url,
436 &proof.ait,
437 serde_json::json!({
438 "ttlSeconds": ttl_seconds,
439 "initiatorProfile": parse_pair_profile(&initiator_profile)?,
440 }),
441 &proof.secret_key,
442 )?;
443 let parsed: PairStartResponsePayload = serde_json::from_value(payload)
444 .map_err(|error| CoreError::InvalidInput(error.to_string()))?;
445 Ok(PairStartResult {
446 initiator_agent_did: parse_non_empty(&parsed.initiator_agent_did, "initiatorAgentDid")?,
447 initiator_profile: parse_pair_profile(&parsed.initiator_profile)?,
448 ticket: parse_pairing_ticket(&parsed.ticket)?,
449 expires_at: parse_non_empty(&parsed.expires_at, "expiresAt")?,
450 proxy_url: parse_proxy_url(proxy_url)?,
451 qr_path: None,
452 })
453}
454
455#[allow(clippy::too_many_lines)]
457pub fn confirm_pairing(
458 config_dir: &Path,
459 store: &SqliteStore,
460 agent_name: &str,
461 confirm_input: PairConfirmInput,
462 responder_profile: PairProfile,
463) -> Result<PairConfirmResult> {
464 let ticket = match confirm_input {
465 PairConfirmInput::Ticket(ticket) => parse_pairing_ticket(&ticket)?,
466 PairConfirmInput::QrFile(path) => {
467 let image = fs::read(&path).map_err(|source| CoreError::Io { path, source })?;
468 parse_pairing_ticket(&decode_ticket_from_png(&image)?)?
469 }
470 };
471
472 let proof = read_local_agent_proof_material(config_dir, agent_name)?;
473 let proxy_url = parse_pairing_ticket_issuer_origin(&ticket)?;
474 let request_url = to_request_url(&proxy_url, PAIR_CONFIRM_PATH)?;
475 let payload = execute_pair_request(
476 &request_url,
477 &proof.ait,
478 serde_json::json!({
479 "ticket": ticket,
480 "responderProfile": parse_pair_profile(&responder_profile)?,
481 }),
482 &proof.secret_key,
483 )?;
484
485 let parsed: PairConfirmResponsePayload = serde_json::from_value(payload)
486 .map_err(|error| CoreError::InvalidInput(error.to_string()))?;
487 if !parsed.paired {
488 return Err(CoreError::InvalidInput(
489 "pair confirm response is invalid".to_string(),
490 ));
491 }
492
493 let peer_alias = persist_confirmed_peer(
494 store,
495 config_dir,
496 &ticket,
497 &parsed.initiator_agent_did,
498 &parsed.initiator_profile,
499 parsed.initiator_profile.proxy_origin.clone(),
500 )?;
501
502 Ok(PairConfirmResult {
503 paired: true,
504 initiator_agent_did: parse_non_empty(&parsed.initiator_agent_did, "initiatorAgentDid")?,
505 initiator_profile: parse_pair_profile(&parsed.initiator_profile)?,
506 responder_agent_did: parse_non_empty(&parsed.responder_agent_did, "responderAgentDid")?,
507 responder_profile: parse_pair_profile(&parsed.responder_profile)?,
508 proxy_url,
509 peer_alias: Some(peer_alias),
510 })
511}
512
513#[allow(clippy::too_many_lines)]
514fn get_pairing_status_once(
515 config_dir: &Path,
516 store: &SqliteStore,
517 agent_name: &str,
518 proxy_url: &str,
519 ticket: &str,
520) -> Result<PairStatusResult> {
521 let ticket = parse_pairing_ticket(ticket)?;
522 assert_ticket_issuer_matches_proxy(&ticket, proxy_url)?;
523 let proof = read_local_agent_proof_material(config_dir, agent_name)?;
524 let request_url = to_request_url(proxy_url, PAIR_STATUS_PATH)?;
525 let payload = execute_pair_request(
526 &request_url,
527 &proof.ait,
528 serde_json::json!({ "ticket": ticket }),
529 &proof.secret_key,
530 )?;
531
532 let parsed: PairStatusResponsePayload = serde_json::from_value(payload)
533 .map_err(|error| CoreError::InvalidInput(error.to_string()))?;
534 let status = match parsed.status.as_str() {
535 "pending" => PairStatusKind::Pending,
536 "confirmed" => PairStatusKind::Confirmed,
537 _ => {
538 return Err(CoreError::InvalidInput(
539 "pair status response is invalid".to_string(),
540 ));
541 }
542 };
543
544 let mut peer_alias = None;
545 if status == PairStatusKind::Confirmed {
546 let responder_agent_did = parsed.responder_agent_did.clone().ok_or_else(|| {
547 CoreError::InvalidInput("pair status response is invalid".to_string())
548 })?;
549 let responder_profile = parsed.responder_profile.clone().ok_or_else(|| {
550 CoreError::InvalidInput("pair status response is invalid".to_string())
551 })?;
552
553 let (peer_did, peer_profile) = if proof.agent_did == parsed.initiator_agent_did {
554 (responder_agent_did, responder_profile)
555 } else if proof.agent_did == responder_agent_did {
556 (
557 parsed.initiator_agent_did.clone(),
558 parsed.initiator_profile.clone(),
559 )
560 } else {
561 return Err(CoreError::InvalidInput(
562 "local agent is not a pairing participant".to_string(),
563 ));
564 };
565
566 peer_alias = Some(persist_confirmed_peer(
567 store,
568 config_dir,
569 &ticket,
570 &peer_did,
571 &peer_profile,
572 peer_profile.proxy_origin.clone(),
573 )?);
574 }
575
576 Ok(PairStatusResult {
577 status,
578 initiator_agent_did: parsed.initiator_agent_did,
579 initiator_profile: parsed.initiator_profile,
580 responder_agent_did: parsed.responder_agent_did,
581 responder_profile: parsed.responder_profile,
582 expires_at: parsed.expires_at,
583 confirmed_at: parsed.confirmed_at,
584 proxy_url: parse_proxy_url(proxy_url)?,
585 peer_alias,
586 })
587}
588
589pub fn get_pairing_status(
591 config_dir: &Path,
592 store: &SqliteStore,
593 agent_name: &str,
594 proxy_url: &str,
595 ticket: &str,
596 options: PairStatusOptions,
597) -> Result<PairStatusResult> {
598 if !options.wait {
599 return get_pairing_status_once(config_dir, store, agent_name, proxy_url, ticket);
600 }
601
602 let wait_seconds = if options.wait_seconds == 0 {
603 DEFAULT_STATUS_WAIT_SECONDS
604 } else {
605 options.wait_seconds
606 };
607 let poll_interval_seconds = if options.poll_interval_seconds == 0 {
608 DEFAULT_STATUS_POLL_INTERVAL_SECONDS
609 } else {
610 options.poll_interval_seconds
611 };
612 let deadline = chrono::Utc::now().timestamp() + wait_seconds as i64;
613 loop {
614 let status = get_pairing_status_once(config_dir, store, agent_name, proxy_url, ticket)?;
615 if status.status == PairStatusKind::Confirmed {
616 return Ok(status);
617 }
618
619 if chrono::Utc::now().timestamp() >= deadline {
620 return Err(CoreError::InvalidInput(format!(
621 "pairing is still pending after {wait_seconds} seconds"
622 )));
623 }
624 std::thread::sleep(Duration::from_secs(poll_interval_seconds));
625 }
626}
627
628#[cfg(test)]
629#[path = "pairing_tests.rs"]
630mod tests;