runtara_workflow_stdlib/
connections.rs

1// Copyright (C) 2025 SyncMyOrders Sp. z o.o.
2// SPDX-License-Identifier: AGPL-3.0-or-later
3//! Connection management for workflows.
4//!
5//! This module provides functionality to fetch connections from an external
6//! connection service at runtime. The connection service URL is configured
7//! at compilation time and baked into the generated workflow binary.
8//!
9//! ## Connection Service Protocol
10//!
11//! The connection service should implement:
12//! ```text
13//! GET {base_url}/{tenant_id}/{connection_id}
14//!
15//! Response 200:
16//! {
17//!   "parameters": { ... },          // Connection credentials/config
18//!   "integration_id": "bearer",     // Connection type identifier
19//!   "connection_subtype": "...",    // Optional subtype
20//!   "rate_limit": {                 // Optional rate limit state
21//!     "is_limited": false,
22//!     "remaining": 100,
23//!     "reset_at": 1234567890,       // Unix timestamp
24//!     "retry_after_ms": 60000
25//!   }
26//! }
27//! ```
28
29use serde::{Deserialize, Serialize};
30use serde_json::Value;
31use std::time::Duration;
32
33/// Response from the connection service.
34#[derive(Debug, Clone, Serialize, Deserialize)]
35pub struct ConnectionResponse {
36    /// Connection credentials and configuration
37    pub parameters: Value,
38
39    /// Connection type identifier (e.g., "sftp", "bearer", "api_key")
40    pub integration_id: String,
41
42    /// Optional connection subtype
43    #[serde(skip_serializing_if = "Option::is_none")]
44    pub connection_subtype: Option<String>,
45
46    /// Rate limit state (if applicable)
47    #[serde(skip_serializing_if = "Option::is_none")]
48    pub rate_limit: Option<RateLimitState>,
49}
50
51/// Rate limit state from the connection service.
52#[derive(Debug, Clone, Serialize, Deserialize)]
53pub struct RateLimitState {
54    /// Whether the connection is currently rate limited
55    pub is_limited: bool,
56
57    /// Remaining requests in the current window (if known)
58    #[serde(skip_serializing_if = "Option::is_none")]
59    pub remaining: Option<u32>,
60
61    /// Unix timestamp when the rate limit resets
62    #[serde(skip_serializing_if = "Option::is_none")]
63    pub reset_at: Option<i64>,
64
65    /// Milliseconds to wait before retrying
66    #[serde(skip_serializing_if = "Option::is_none")]
67    pub retry_after_ms: Option<u64>,
68}
69
70/// Error fetching a connection.
71#[derive(Debug)]
72pub enum ConnectionError {
73    /// Connection not found
74    NotFound(String),
75    /// Rate limited - should wait and retry
76    RateLimited {
77        connection_id: String,
78        retry_after: Duration,
79    },
80    /// Network or HTTP error
81    FetchError(String),
82    /// Invalid response from connection service
83    InvalidResponse(String),
84}
85
86impl std::fmt::Display for ConnectionError {
87    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
88        match self {
89            ConnectionError::NotFound(id) => write!(f, "Connection '{}' not found", id),
90            ConnectionError::RateLimited {
91                connection_id,
92                retry_after,
93            } => write!(
94                f,
95                "Connection '{}' is rate limited, retry after {:?}",
96                connection_id, retry_after
97            ),
98            ConnectionError::FetchError(msg) => write!(f, "Failed to fetch connection: {}", msg),
99            ConnectionError::InvalidResponse(msg) => {
100                write!(f, "Invalid connection response: {}", msg)
101            }
102        }
103    }
104}
105
106impl std::error::Error for ConnectionError {}
107
108/// Fetch a connection from the connection service.
109///
110/// # Arguments
111/// * `service_url` - Base URL of the connection service
112/// * `tenant_id` - Tenant identifier
113/// * `connection_id` - Connection identifier
114///
115/// # Returns
116/// Connection response with credentials and rate limit state
117pub fn fetch_connection(
118    service_url: &str,
119    tenant_id: &str,
120    connection_id: &str,
121) -> Result<ConnectionResponse, ConnectionError> {
122    let url = format!("{}/{}/{}", service_url, tenant_id, connection_id);
123
124    let response = ureq::get(&url)
125        .timeout(Duration::from_secs(30))
126        .call()
127        .map_err(|e| ConnectionError::FetchError(e.to_string()))?;
128
129    if response.status() == 404 {
130        return Err(ConnectionError::NotFound(connection_id.to_string()));
131    }
132
133    if response.status() == 429 {
134        // Rate limited by connection service itself
135        let retry_after = response
136            .header("Retry-After")
137            .and_then(|h| h.parse::<u64>().ok())
138            .unwrap_or(60);
139        return Err(ConnectionError::RateLimited {
140            connection_id: connection_id.to_string(),
141            retry_after: Duration::from_secs(retry_after),
142        });
143    }
144
145    if response.status() != 200 {
146        return Err(ConnectionError::FetchError(format!(
147            "HTTP {}",
148            response.status()
149        )));
150    }
151
152    let body = response
153        .into_string()
154        .map_err(|e| ConnectionError::FetchError(e.to_string()))?;
155
156    serde_json::from_str(&body).map_err(|e| ConnectionError::InvalidResponse(e.to_string()))
157}
158
159impl RateLimitState {
160    /// Get the duration to wait before retrying.
161    pub fn wait_duration(&self) -> Duration {
162        if let Some(ms) = self.retry_after_ms {
163            return Duration::from_millis(ms);
164        }
165
166        if let Some(reset_at) = self.reset_at {
167            let now = std::time::SystemTime::now()
168                .duration_since(std::time::UNIX_EPOCH)
169                .unwrap_or_default()
170                .as_secs() as i64;
171
172            if reset_at > now {
173                return Duration::from_secs((reset_at - now) as u64);
174            }
175        }
176
177        // Default wait time if no specific value provided
178        Duration::from_secs(60)
179    }
180}