alpine_protocol_sdk/handshake/
client.rs1use std::net::SocketAddr;
2
3use alpine::crypto::identity::NodeCredentials;
4use alpine::messages::{CapabilitySet, DeviceIdentity};
5
6use crate::discovery::{DiscoveryAttempt, DiscoveryOutcome, DiscoveryRunReport};
7use crate::error::AlpineSdkError;
8use crate::session::{AlpineClient, ProbeOptions, ProbeState};
9use std::time::Instant;
10use tracing::warn;
11
12#[derive(Debug, Clone)]
13pub struct ConnectPolicy {
14 pub require_trust: bool,
15 pub require_probe: bool,
16 pub allow_degraded: bool,
17 pub probe_options: ProbeOptions,
18}
19
20impl Default for ConnectPolicy {
21 fn default() -> Self {
22 Self {
23 require_trust: true,
24 require_probe: true,
25 allow_degraded: false,
26 probe_options: ProbeOptions::default(),
27 }
28 }
29}
30
31#[derive(Debug)]
32pub enum HandshakeReportResult {
33 Connected(AlpineClient),
34 Failed(AlpineSdkError),
35}
36
37#[derive(Debug, Clone)]
38pub struct TraceStep {
39 pub label: String,
40 pub elapsed_ms: u128,
41}
42
43#[derive(Debug, Clone)]
44pub struct TraceTimeline {
45 pub steps: Vec<TraceStep>,
46}
47
48impl TraceTimeline {
49 pub fn new() -> Self {
50 Self { steps: Vec::new() }
51 }
52
53 pub fn push(&mut self, label: impl Into<String>, started: Instant) {
54 self.steps.push(TraceStep {
55 label: label.into(),
56 elapsed_ms: started.elapsed().as_millis(),
57 });
58 }
59}
60
61#[must_use]
62#[derive(Debug)]
63pub struct HandshakeReport {
64 pub run_id: Option<String>,
65 pub remote_addr: Option<SocketAddr>,
66 pub device_id: Option<String>,
67 pub policy: ConnectPolicy,
68 pub probe: Option<crate::session::ProbeResult>,
69 pub discovery_attempts: Vec<DiscoveryAttempt>,
70 pub result: HandshakeReportResult,
71 pub timeline: TraceTimeline,
72}
73
74impl HandshakeReport {
75 pub fn into_result(self) -> Result<AlpineClient, AlpineSdkError> {
76 match self.result {
77 HandshakeReportResult::Connected(client) => Ok(client),
78 HandshakeReportResult::Failed(err) => Err(err),
79 }
80 }
81}
82
83pub async fn connect(
85 local_addr: SocketAddr,
86 remote_addr: SocketAddr,
87 identity: DeviceIdentity,
88 capabilities: CapabilitySet,
89 credentials: NodeCredentials,
90) -> Result<AlpineClient, AlpineSdkError> {
91 AlpineClient::connect(local_addr, remote_addr, identity, capabilities, credentials).await
92}
93
94pub async fn connect_with_policy(
96 local_addr: SocketAddr,
97 outcome: DiscoveryOutcome,
98 credentials: NodeCredentials,
99 policy: ConnectPolicy,
100) -> Result<AlpineClient, AlpineSdkError> {
101 let run_id = outcome.run_id.clone();
102 let outcome = if policy.require_trust {
103 outcome.require_trusted()?.outcome
104 } else {
105 outcome
106 };
107 let protocol_report = outcome.protocol_report();
108 if !protocol_report.compatible {
109 return Err(AlpineSdkError::IncompatibleProtocol(protocol_report.note));
110 }
111 let identity = DeviceIdentity {
112 device_id: outcome.reply.device_id.clone(),
113 manufacturer_id: outcome.reply.manufacturer_id.clone(),
114 model_id: outcome.reply.model_id.clone(),
115 hardware_rev: outcome.reply.hardware_rev.clone(),
116 firmware_rev: outcome.reply.firmware_rev.clone(),
117 };
118 let capabilities = outcome.reply.capabilities.clone();
119 let client = AlpineClient::connect(
120 local_addr,
121 outcome.peer,
122 identity,
123 capabilities,
124 credentials,
125 )
126 .await
127 .map_err(|err| {
128 warn_clock_skew_if_needed(&err);
129 err
130 })?;
131 let mut client = client;
132 client.set_run_id(run_id);
133
134 if policy.require_probe {
135 let probe = client
136 .probe_status_with_options(policy.probe_options.clone())
137 .await;
138 let acceptable = match probe.state {
139 ProbeState::Online => true,
140 ProbeState::Degraded => policy.allow_degraded,
141 ProbeState::Offline => false,
142 };
143 if !acceptable {
144 let detail = probe
145 .detail
146 .or_else(|| probe.errors.first().map(|err| err.message.clone()))
147 .unwrap_or_else(|| format!("probe state {:?}", probe.state));
148 return Err(AlpineSdkError::ProbeFailed(detail));
149 }
150 }
151
152 Ok(client)
153}
154
155pub async fn connect_with_policy_report(
157 local_addr: SocketAddr,
158 outcome: DiscoveryOutcome,
159 credentials: NodeCredentials,
160 policy: ConnectPolicy,
161) -> HandshakeReport {
162 let started = Instant::now();
163 let mut timeline = TraceTimeline::new();
164 let device_id = Some(outcome.reply.device_id.clone());
165 let remote_addr = Some(outcome.peer);
166 let run_id = Some(outcome.run_id.clone());
167 let mut probe = None;
168 let outcome = if policy.require_trust {
169 match outcome.require_trusted() {
170 Ok(trusted) => trusted.outcome,
171 Err(err) => {
172 timeline.push("trust_failed", started);
173 return HandshakeReport {
174 run_id,
175 remote_addr,
176 device_id,
177 policy,
178 probe: None,
179 discovery_attempts: Vec::new(),
180 result: HandshakeReportResult::Failed(err),
181 timeline,
182 };
183 }
184 }
185 } else {
186 timeline.push("trust_skipped", started);
187 outcome
188 };
189 let protocol_report = outcome.protocol_report();
190 if !protocol_report.compatible {
191 timeline.push("protocol_mismatch", started);
192 return HandshakeReport {
193 run_id,
194 remote_addr,
195 device_id,
196 policy,
197 probe: None,
198 discovery_attempts: Vec::new(),
199 result: HandshakeReportResult::Failed(AlpineSdkError::IncompatibleProtocol(
200 protocol_report.note,
201 )),
202 timeline,
203 };
204 }
205 let identity = DeviceIdentity {
206 device_id: outcome.reply.device_id.clone(),
207 manufacturer_id: outcome.reply.manufacturer_id.clone(),
208 model_id: outcome.reply.model_id.clone(),
209 hardware_rev: outcome.reply.hardware_rev.clone(),
210 firmware_rev: outcome.reply.firmware_rev.clone(),
211 };
212 let capabilities = outcome.reply.capabilities.clone();
213 let client = match AlpineClient::connect(
214 local_addr,
215 outcome.peer,
216 identity,
217 capabilities,
218 credentials,
219 )
220 .await
221 {
222 Ok(client) => client,
223 Err(err) => {
224 warn_clock_skew_if_needed(&err);
225 timeline.push("handshake_failed", started);
226 return HandshakeReport {
227 run_id,
228 remote_addr,
229 device_id,
230 policy,
231 probe: None,
232 discovery_attempts: Vec::new(),
233 result: HandshakeReportResult::Failed(err),
234 timeline,
235 };
236 }
237 };
238
239 let mut client = client;
240 if let Some(run_id) = run_id.clone() {
241 client.set_run_id(run_id);
242 }
243 timeline.push("handshake_connected", started);
244 let result = if policy.require_probe {
245 let probe_result = client
246 .probe_status_with_options(policy.probe_options.clone())
247 .await;
248 let acceptable = match probe_result.state {
249 ProbeState::Online => true,
250 ProbeState::Degraded => policy.allow_degraded,
251 ProbeState::Offline => false,
252 };
253 probe = Some(probe_result.clone());
254 if !acceptable {
255 let detail = probe_result
256 .detail
257 .clone()
258 .or_else(|| probe_result.errors.first().map(|err| err.message.clone()))
259 .unwrap_or_else(|| format!("probe state {:?}", probe_result.state));
260 client.close().await;
261 timeline.push("probe_failed", started);
262 HandshakeReportResult::Failed(AlpineSdkError::ProbeFailed(detail))
263 } else {
264 timeline.push("probe_ok", started);
265 HandshakeReportResult::Connected(client)
266 }
267 } else {
268 timeline.push("probe_skipped", started);
269 HandshakeReportResult::Connected(client)
270 };
271
272 HandshakeReport {
273 run_id,
274 remote_addr,
275 device_id,
276 policy,
277 probe,
278 discovery_attempts: Vec::new(),
279 result,
280 timeline,
281 }
282}
283
284pub async fn connect_with_policy_from_report(
286 local_addr: SocketAddr,
287 report: DiscoveryRunReport,
288 credentials: NodeCredentials,
289 policy: ConnectPolicy,
290) -> Result<AlpineClient, AlpineSdkError> {
291 let outcome = match report.result {
292 Ok(outcome) => outcome,
293 Err(err) => {
294 let attempts = format_attempts(&report.attempts);
295 return Err(AlpineSdkError::DiscoveryFailed(format!(
296 "{}; attempts: {}",
297 err, attempts
298 )));
299 }
300 };
301
302 match connect_with_policy(local_addr, outcome, credentials, policy).await {
303 Ok(client) => Ok(client),
304 Err(AlpineSdkError::ProbeFailed(detail)) => {
305 let attempts = format_attempts(&report.attempts);
306 Err(AlpineSdkError::ProbeFailed(format!(
307 "{}; attempts: {}",
308 detail, attempts
309 )))
310 }
311 Err(err) => Err(err),
312 }
313}
314
315pub async fn connect_with_policy_from_report_report(
317 local_addr: SocketAddr,
318 report: DiscoveryRunReport,
319 credentials: NodeCredentials,
320 policy: ConnectPolicy,
321) -> HandshakeReport {
322 let attempts = report.attempts.clone();
323 match report.result {
324 Ok(outcome) => {
325 let mut handshake =
326 connect_with_policy_report(local_addr, outcome, credentials, policy).await;
327 handshake.discovery_attempts = attempts;
328 handshake
329 }
330 Err(err) => {
331 let detail = format_attempts(&attempts);
332 HandshakeReport {
333 run_id: None,
334 remote_addr: None,
335 device_id: None,
336 policy,
337 probe: None,
338 discovery_attempts: attempts,
339 result: HandshakeReportResult::Failed(AlpineSdkError::DiscoveryFailed(format!(
340 "{}; attempts: {}",
341 err, detail
342 ))),
343 timeline: TraceTimeline::new(),
344 }
345 }
346 }
347}
348
349fn format_attempts(attempts: &[DiscoveryAttempt]) -> String {
350 if attempts.is_empty() {
351 return "none".into();
352 }
353 let mut rendered = Vec::new();
354 for attempt in attempts {
355 let error = attempt.error.as_ref().map(|err| err.label).unwrap_or("ok");
356 rendered.push(format!(
357 "{} {} {}",
358 attempt.mode.as_str(),
359 attempt.target,
360 error
361 ));
362 }
363 rendered.join(", ")
364}
365
366fn warn_clock_skew_if_needed(err: &AlpineSdkError) {
367 if let AlpineSdkError::HandshakeFailed(detail) = err {
368 let lowered = detail.to_ascii_lowercase();
369 if lowered.contains("signature")
370 || lowered.contains("expired")
371 || lowered.contains("timestamp")
372 {
373 warn!(
374 "[ALPINE][HANDSHAKE][WARN] signature validation failed; clock skew may be causing auth errors"
375 );
376 }
377 }
378}