runtara_workflow_stdlib/
connections.rs1use serde::{Deserialize, Serialize};
30use serde_json::Value;
31use std::time::Duration;
32
33#[derive(Debug, Clone, Serialize, Deserialize)]
35pub struct ConnectionResponse {
36 pub parameters: Value,
38
39 pub integration_id: String,
41
42 #[serde(skip_serializing_if = "Option::is_none")]
44 pub connection_subtype: Option<String>,
45
46 #[serde(skip_serializing_if = "Option::is_none")]
48 pub rate_limit: Option<RateLimitState>,
49}
50
51#[derive(Debug, Clone, Serialize, Deserialize)]
53pub struct RateLimitState {
54 pub is_limited: bool,
56
57 #[serde(skip_serializing_if = "Option::is_none")]
59 pub remaining: Option<u32>,
60
61 #[serde(skip_serializing_if = "Option::is_none")]
63 pub reset_at: Option<i64>,
64
65 #[serde(skip_serializing_if = "Option::is_none")]
67 pub retry_after_ms: Option<u64>,
68}
69
70#[derive(Debug)]
72pub enum ConnectionError {
73 NotFound(String),
75 RateLimited {
77 connection_id: String,
78 retry_after: Duration,
79 },
80 FetchError(String),
82 InvalidResponse(String),
84}
85
86impl std::fmt::Display for ConnectionError {
87 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
88 match self {
89 ConnectionError::NotFound(id) => write!(f, "Connection '{}' not found", id),
90 ConnectionError::RateLimited {
91 connection_id,
92 retry_after,
93 } => write!(
94 f,
95 "Connection '{}' is rate limited, retry after {:?}",
96 connection_id, retry_after
97 ),
98 ConnectionError::FetchError(msg) => write!(f, "Failed to fetch connection: {}", msg),
99 ConnectionError::InvalidResponse(msg) => {
100 write!(f, "Invalid connection response: {}", msg)
101 }
102 }
103 }
104}
105
106impl std::error::Error for ConnectionError {}
107
108pub fn fetch_connection(
118 service_url: &str,
119 tenant_id: &str,
120 connection_id: &str,
121) -> Result<ConnectionResponse, ConnectionError> {
122 let url = format!("{}/{}/{}", service_url, tenant_id, connection_id);
123
124 let response = ureq::get(&url)
125 .timeout(Duration::from_secs(30))
126 .call()
127 .map_err(|e| ConnectionError::FetchError(e.to_string()))?;
128
129 if response.status() == 404 {
130 return Err(ConnectionError::NotFound(connection_id.to_string()));
131 }
132
133 if response.status() == 429 {
134 let retry_after = response
136 .header("Retry-After")
137 .and_then(|h| h.parse::<u64>().ok())
138 .unwrap_or(60);
139 return Err(ConnectionError::RateLimited {
140 connection_id: connection_id.to_string(),
141 retry_after: Duration::from_secs(retry_after),
142 });
143 }
144
145 if response.status() != 200 {
146 return Err(ConnectionError::FetchError(format!(
147 "HTTP {}",
148 response.status()
149 )));
150 }
151
152 let body = response
153 .into_string()
154 .map_err(|e| ConnectionError::FetchError(e.to_string()))?;
155
156 serde_json::from_str(&body).map_err(|e| ConnectionError::InvalidResponse(e.to_string()))
157}
158
159impl RateLimitState {
160 pub fn wait_duration(&self) -> Duration {
162 if let Some(ms) = self.retry_after_ms {
163 return Duration::from_millis(ms);
164 }
165
166 if let Some(reset_at) = self.reset_at {
167 let now = std::time::SystemTime::now()
168 .duration_since(std::time::UNIX_EPOCH)
169 .unwrap_or_default()
170 .as_secs() as i64;
171
172 if reset_at > now {
173 return Duration::from_secs((reset_at - now) as u64);
174 }
175 }
176
177 Duration::from_secs(60)
179 }
180}