Skip to main content

noxy_sdk/
client.rs

1use crate::config::NoxyConfig;
2use crate::decision_outcome::{
3    SendDecisionAndWaitNoDecisionIdError, SendDecisionAndWaitOptions, WaitForDecisionOutcomeOptions,
4    WaitForDecisionOutcomeTimeoutError,
5};
6use crate::kyber_provider::KyberProvider;
7use crate::services::{DecisionService, IdentityService, QuotaService};
8use crate::transport::proto;
9use crate::transport::AgentServiceClient;
10use crate::types::{
11    NoxyDeliveryOutcome, NoxyGetDecisionOutcomeResponse, NoxyGetQuotaResponse, NoxyHumanDecisionOutcome,
12    NoxyIdentityAddress,
13};
14use tonic::metadata::AsciiMetadataValue;
15use tonic::transport::Channel;
16use tonic::Request;
17use uuid::Uuid;
18
19pub struct NoxyAgentClient {
20    config: NoxyConfig,
21    channel: Channel,
22    auth_value: AsciiMetadataValue,
23    decision_service: DecisionService,
24    identity_service: IdentityService,
25    quota_service: QuotaService,
26}
27
28impl NoxyAgentClient {
29    pub(crate) fn new(
30        config: NoxyConfig,
31        channel: Channel,
32        kyber_provider: KyberProvider,
33    ) -> Self {
34        let auth_value: AsciiMetadataValue = format!("Bearer {}", config.auth_token)
35            .parse()
36            .expect("valid auth token");
37        let decision_service = DecisionService::new(kyber_provider);
38        let identity_service = IdentityService::new();
39        let quota_service = QuotaService::new();
40        Self {
41            config,
42            channel,
43            auth_value,
44            decision_service,
45            identity_service,
46            quota_service,
47        }
48    }
49
50    fn create_client(&self) -> AgentServiceClient<Channel> {
51        AgentServiceClient::new(self.channel.clone())
52    }
53
54    /// Route an actionable decision to all devices registered for the identity.
55    /// Uses one client-generated `decision_id` (UUID) for the whole batch so every device shares the same logical decision.
56    pub async fn send_decision<T>(
57        &self,
58        identity_address: NoxyIdentityAddress,
59        actionable_decision: &T,
60    ) -> Result<Vec<NoxyDeliveryOutcome>, Box<dyn std::error::Error + Send + Sync>>
61    where
62        T: serde::Serialize,
63    {
64        let mut client = self.create_client();
65        let devices = self
66            .identity_service
67            .get_devices(&mut client, &identity_address, &self.auth_value)
68            .await?;
69        self.decision_service
70            .send(
71                &mut client,
72                &devices,
73                actionable_decision,
74                self.config.decision_ttl_seconds,
75                &self.auth_value,
76            )
77            .await
78    }
79
80    /// Single poll for human-in-the-loop resolution.
81    pub async fn get_decision_outcome(
82        &self,
83        decision_id: &str,
84        identity_id: &str,
85    ) -> Result<NoxyGetDecisionOutcomeResponse, tonic::Status> {
86        let mut client = self.create_client();
87        let req = proto::GetDecisionOutcomeRequest {
88            request_id: Uuid::new_v4().to_string(),
89            decision_id: decision_id.to_string(),
90            identity_id: identity_id.to_string(),
91        };
92        let mut request = Request::new(req);
93        request
94            .metadata_mut()
95            .insert("authorization", self.auth_value.clone());
96        let response = client.get_decision_outcome(request).await?.into_inner();
97        Ok(NoxyGetDecisionOutcomeResponse {
98            request_id: response.request_id,
99            pending: response.pending,
100            outcome: NoxyHumanDecisionOutcome::from(response.outcome),
101        })
102    }
103
104    /// Poll `GetDecisionOutcome` with exponential backoff until terminal outcome or `pending == false`.
105    pub async fn wait_for_decision_outcome(
106        &self,
107        options: WaitForDecisionOutcomeOptions,
108    ) -> Result<NoxyGetDecisionOutcomeResponse, Box<dyn std::error::Error + Send + Sync>> {
109        let initial_poll_interval_ms = options.initial_poll_interval_ms.unwrap_or(400);
110        let max_poll_interval_ms = options.max_poll_interval_ms.unwrap_or(30_000);
111        let max_wait_ms = options.max_wait_ms.unwrap_or(900_000);
112        let backoff_multiplier = options.backoff_multiplier.unwrap_or(1.6);
113
114        let started = std::time::Instant::now();
115        let mut interval = initial_poll_interval_ms;
116        let decision_id = options.decision_id.clone();
117        let identity_id = options.identity_id.clone();
118
119        loop {
120            if started.elapsed().as_millis() as u64 > max_wait_ms {
121                return Err(Box::new(WaitForDecisionOutcomeTimeoutError));
122            }
123
124            let raw = self
125                .get_decision_outcome(&decision_id, &identity_id)
126                .await
127                .map_err(|e| format!("GetDecisionOutcome: {}", e))?;
128
129            if crate::decision_outcome::is_terminal_human_outcome(raw.outcome) {
130                return Ok(raw);
131            }
132            if !raw.pending {
133                return Ok(raw);
134            }
135
136            tokio::time::sleep(std::time::Duration::from_millis(
137                interval.min(max_poll_interval_ms),
138            ))
139            .await;
140            interval = ((interval as f64 * backoff_multiplier) as u64).min(max_poll_interval_ms);
141        }
142    }
143
144    /// [`send_decision`] then [`wait_for_decision_outcome`] using the first delivery with a non-empty `decision_id`.
145    pub async fn send_decision_and_wait_for_outcome<T>(
146        &self,
147        identity_address: NoxyIdentityAddress,
148        actionable_decision: &T,
149        options: Option<SendDecisionAndWaitOptions>,
150    ) -> Result<NoxyGetDecisionOutcomeResponse, Box<dyn std::error::Error + Send + Sync>>
151    where
152        T: serde::Serialize,
153    {
154        let deliveries = self
155            .send_decision(identity_address.clone(), actionable_decision)
156            .await?;
157        let with_id = deliveries.iter().find(|d| !d.decision_id.is_empty());
158        let Some(d) = with_id else {
159            return Err(Box::new(SendDecisionAndWaitNoDecisionIdError));
160        };
161        let o = options.unwrap_or_default();
162        self.wait_for_decision_outcome(WaitForDecisionOutcomeOptions {
163            decision_id: d.decision_id.clone(),
164            identity_id: identity_address,
165            initial_poll_interval_ms: o.initial_poll_interval_ms,
166            max_poll_interval_ms: o.max_poll_interval_ms,
167            max_wait_ms: o.max_wait_ms,
168            backoff_multiplier: o.backoff_multiplier,
169        })
170        .await
171    }
172
173    pub async fn get_quota(&self) -> Result<NoxyGetQuotaResponse, tonic::Status> {
174        let mut client = self.create_client();
175        self.quota_service
176            .get(&mut client, &self.auth_value)
177            .await
178    }
179}