1use crate::config::NoxyConfig;
2use crate::decision_outcome::{
3 SendDecisionAndWaitNoDecisionIdError, SendDecisionAndWaitOptions, WaitForDecisionOutcomeOptions,
4 WaitForDecisionOutcomeTimeoutError,
5};
6use crate::kyber_provider::KyberProvider;
7use crate::services::{DecisionService, IdentityService, QuotaService};
8use crate::transport::proto;
9use crate::transport::AgentServiceClient;
10use crate::types::{
11 NoxyDeliveryOutcome, NoxyGetDecisionOutcomeResponse, NoxyGetQuotaResponse, NoxyHumanDecisionOutcome,
12 NoxyIdentityAddress,
13};
14use tonic::metadata::AsciiMetadataValue;
15use tonic::transport::Channel;
16use tonic::Request;
17use uuid::Uuid;
18
19pub struct NoxyAgentClient {
20 config: NoxyConfig,
21 channel: Channel,
22 auth_value: AsciiMetadataValue,
23 decision_service: DecisionService,
24 identity_service: IdentityService,
25 quota_service: QuotaService,
26}
27
28impl NoxyAgentClient {
29 pub(crate) fn new(
30 config: NoxyConfig,
31 channel: Channel,
32 kyber_provider: KyberProvider,
33 ) -> Self {
34 let auth_value: AsciiMetadataValue = format!("Bearer {}", config.auth_token)
35 .parse()
36 .expect("valid auth token");
37 let decision_service = DecisionService::new(kyber_provider);
38 let identity_service = IdentityService::new();
39 let quota_service = QuotaService::new();
40 Self {
41 config,
42 channel,
43 auth_value,
44 decision_service,
45 identity_service,
46 quota_service,
47 }
48 }
49
50 fn create_client(&self) -> AgentServiceClient<Channel> {
51 AgentServiceClient::new(self.channel.clone())
52 }
53
54 pub async fn send_decision<T>(
57 &self,
58 identity_address: NoxyIdentityAddress,
59 actionable_decision: &T,
60 ) -> Result<Vec<NoxyDeliveryOutcome>, Box<dyn std::error::Error + Send + Sync>>
61 where
62 T: serde::Serialize,
63 {
64 let mut client = self.create_client();
65 let devices = self
66 .identity_service
67 .get_devices(&mut client, &identity_address, &self.auth_value)
68 .await?;
69 self.decision_service
70 .send(
71 &mut client,
72 &devices,
73 actionable_decision,
74 self.config.decision_ttl_seconds,
75 &self.auth_value,
76 )
77 .await
78 }
79
80 pub async fn get_decision_outcome(
82 &self,
83 decision_id: &str,
84 identity_id: &str,
85 ) -> Result<NoxyGetDecisionOutcomeResponse, tonic::Status> {
86 let mut client = self.create_client();
87 let req = proto::GetDecisionOutcomeRequest {
88 request_id: Uuid::new_v4().to_string(),
89 decision_id: decision_id.to_string(),
90 identity_id: identity_id.to_string(),
91 };
92 let mut request = Request::new(req);
93 request
94 .metadata_mut()
95 .insert("authorization", self.auth_value.clone());
96 let response = client.get_decision_outcome(request).await?.into_inner();
97 Ok(NoxyGetDecisionOutcomeResponse {
98 request_id: response.request_id,
99 pending: response.pending,
100 outcome: NoxyHumanDecisionOutcome::from(response.outcome),
101 })
102 }
103
104 pub async fn wait_for_decision_outcome(
106 &self,
107 options: WaitForDecisionOutcomeOptions,
108 ) -> Result<NoxyGetDecisionOutcomeResponse, Box<dyn std::error::Error + Send + Sync>> {
109 let initial_poll_interval_ms = options.initial_poll_interval_ms.unwrap_or(400);
110 let max_poll_interval_ms = options.max_poll_interval_ms.unwrap_or(30_000);
111 let max_wait_ms = options.max_wait_ms.unwrap_or(900_000);
112 let backoff_multiplier = options.backoff_multiplier.unwrap_or(1.6);
113
114 let started = std::time::Instant::now();
115 let mut interval = initial_poll_interval_ms;
116 let decision_id = options.decision_id.clone();
117 let identity_id = options.identity_id.clone();
118
119 loop {
120 if started.elapsed().as_millis() as u64 > max_wait_ms {
121 return Err(Box::new(WaitForDecisionOutcomeTimeoutError));
122 }
123
124 let raw = self
125 .get_decision_outcome(&decision_id, &identity_id)
126 .await
127 .map_err(|e| format!("GetDecisionOutcome: {}", e))?;
128
129 if crate::decision_outcome::is_terminal_human_outcome(raw.outcome) {
130 return Ok(raw);
131 }
132 if !raw.pending {
133 return Ok(raw);
134 }
135
136 tokio::time::sleep(std::time::Duration::from_millis(
137 interval.min(max_poll_interval_ms),
138 ))
139 .await;
140 interval = ((interval as f64 * backoff_multiplier) as u64).min(max_poll_interval_ms);
141 }
142 }
143
144 pub async fn send_decision_and_wait_for_outcome<T>(
146 &self,
147 identity_address: NoxyIdentityAddress,
148 actionable_decision: &T,
149 options: Option<SendDecisionAndWaitOptions>,
150 ) -> Result<NoxyGetDecisionOutcomeResponse, Box<dyn std::error::Error + Send + Sync>>
151 where
152 T: serde::Serialize,
153 {
154 let deliveries = self
155 .send_decision(identity_address.clone(), actionable_decision)
156 .await?;
157 let with_id = deliveries.iter().find(|d| !d.decision_id.is_empty());
158 let Some(d) = with_id else {
159 return Err(Box::new(SendDecisionAndWaitNoDecisionIdError));
160 };
161 let o = options.unwrap_or_default();
162 self.wait_for_decision_outcome(WaitForDecisionOutcomeOptions {
163 decision_id: d.decision_id.clone(),
164 identity_id: identity_address,
165 initial_poll_interval_ms: o.initial_poll_interval_ms,
166 max_poll_interval_ms: o.max_poll_interval_ms,
167 max_wait_ms: o.max_wait_ms,
168 backoff_multiplier: o.backoff_multiplier,
169 })
170 .await
171 }
172
173 pub async fn get_quota(&self) -> Result<NoxyGetQuotaResponse, tonic::Status> {
174 let mut client = self.create_client();
175 self.quota_service
176 .get(&mut client, &self.auth_value)
177 .await
178 }
179}