Skip to main content

runtara_sdk/backend/
http.rs

1// Copyright (C) 2025 SyncMyOrders Sp. z o.o.
2// SPDX-License-Identifier: AGPL-3.0-or-later
3//! HTTP backend for runtara-sdk.
4//!
5//! Implements `SdkBackend` using HTTP/JSON to communicate with runtara-core's
6//! HTTP instance API.
7//!
8//! Used by:
9//! - Native scenarios with `RUNTARA_SDK_BACKEND=http`
10//! - WASM scenarios (future, via wasi-http)
11
12use std::sync::atomic::{AtomicBool, Ordering};
13use std::time::Duration;
14
15use base64::Engine;
16use chrono::{DateTime, Utc};
17use serde::{Deserialize, Serialize};
18use tracing::{debug, info, warn};
19
20use crate::backend::SdkBackend;
21use crate::error::{Result, SdkError};
22use crate::types::{
23    CheckpointResult, CustomSignal, InstanceStatus, Signal, SignalType, StatusResponse,
24};
25
26/// Configuration for the HTTP backend.
27#[derive(Debug, Clone)]
28pub struct HttpSdkConfig {
29    /// Instance ID (required).
30    pub instance_id: String,
31    /// Tenant ID (required).
32    pub tenant_id: String,
33    /// Base URL for runtara-core HTTP API (e.g., `http://127.0.0.1:8003`).
34    pub base_url: String,
35    /// Request timeout in milliseconds (default: 30000).
36    pub request_timeout_ms: u64,
37    /// Signal poll interval in milliseconds (default: 1000).
38    pub signal_poll_interval_ms: u64,
39    /// Heartbeat interval in milliseconds (default: 30000, 0 to disable).
40    pub heartbeat_interval_ms: u64,
41}
42
43impl HttpSdkConfig {
44    /// Create config from environment variables.
45    ///
46    /// Required: `RUNTARA_INSTANCE_ID`, `RUNTARA_TENANT_ID`, `RUNTARA_HTTP_URL`
47    pub fn from_env() -> Result<Self> {
48        let instance_id = std::env::var("RUNTARA_INSTANCE_ID")
49            .map_err(|_| SdkError::Config("RUNTARA_INSTANCE_ID not set".into()))?;
50        let tenant_id = std::env::var("RUNTARA_TENANT_ID")
51            .map_err(|_| SdkError::Config("RUNTARA_TENANT_ID not set".into()))?;
52
53        // Try RUNTARA_HTTP_URL first, then derive from RUNTARA_SERVER_ADDR
54        let base_url = if let Ok(url) = std::env::var("RUNTARA_HTTP_URL") {
55            url
56        } else if let Ok(addr) = std::env::var("RUNTARA_SERVER_ADDR") {
57            // RUNTARA_SERVER_ADDR is host:port. HTTP is typically on port+2.
58            let parts: Vec<&str> = addr.split(':').collect();
59            let host = parts.first().unwrap_or(&"127.0.0.1");
60            let base_port: u16 = parts.get(1).and_then(|p| p.parse().ok()).unwrap_or(8001);
61            let http_port = std::env::var("RUNTARA_CORE_HTTP_PORT")
62                .ok()
63                .and_then(|p| p.parse().ok())
64                .unwrap_or(base_port + 2); // Default: base port + 2 (8001 → 8003)
65            format!("http://{}:{}", host, http_port)
66        } else {
67            "http://127.0.0.1:8003".to_string()
68        };
69
70        let request_timeout_ms = std::env::var("RUNTARA_REQUEST_TIMEOUT_MS")
71            .ok()
72            .and_then(|s| s.parse().ok())
73            .unwrap_or(30_000);
74
75        let signal_poll_interval_ms = std::env::var("RUNTARA_SIGNAL_POLL_INTERVAL_MS")
76            .ok()
77            .and_then(|s| s.parse().ok())
78            .unwrap_or(1_000);
79
80        let heartbeat_interval_ms = std::env::var("RUNTARA_HEARTBEAT_INTERVAL_MS")
81            .ok()
82            .and_then(|s| s.parse().ok())
83            .unwrap_or(30_000);
84
85        Ok(Self {
86            instance_id,
87            tenant_id,
88            base_url,
89            request_timeout_ms,
90            signal_poll_interval_ms,
91            heartbeat_interval_ms,
92        })
93    }
94}
95
96/// HTTP backend for the SDK.
97///
98/// Uses `runtara_http::HttpClient` for HTTP calls to runtara-core's HTTP instance API.
99/// All operations are request-response over HTTP/JSON with base64-encoded binary data.
100pub struct HttpBackend {
101    instance_id: String,
102    tenant_id: String,
103    base_url: String,
104    client: runtara_http::HttpClient,
105    connected: AtomicBool,
106}
107
108impl HttpBackend {
109    /// Create a new HTTP backend from config.
110    pub fn new(config: &HttpSdkConfig) -> Result<Self> {
111        let client = runtara_http::HttpClient::with_timeout(Duration::from_millis(
112            config.request_timeout_ms,
113        ));
114
115        Ok(Self {
116            instance_id: config.instance_id.clone(),
117            tenant_id: config.tenant_id.clone(),
118            base_url: config.base_url.trim_end_matches('/').to_string(),
119            client,
120            connected: AtomicBool::new(false),
121        })
122    }
123
124    /// Build URL for an instance endpoint.
125    fn url(&self, path: &str) -> String {
126        format!(
127            "{}/api/v1/instances/{}/{}",
128            self.base_url, self.instance_id, path
129        )
130    }
131
132    /// POST JSON to an endpoint and deserialize the response.
133    fn post<T: Serialize, R: for<'de> Deserialize<'de>>(&self, url: &str, body: &T) -> Result<R> {
134        let json_value = serde_json::to_value(body)
135            .map_err(|e| SdkError::Internal(format!("Failed to serialize request body: {}", e)))?;
136
137        let response = self
138            .client
139            .request("POST", url)
140            .header("Content-Type", "application/json")
141            .header("X-Runtara-Tenant-Id", &self.tenant_id)
142            .header("X-Runtara-Instance-Id", &self.instance_id)
143            .body_json(&json_value)
144            .call()
145            .map_err(|e| SdkError::Internal(format!("HTTP request failed: {}", e)))?;
146
147        if response.status >= 400 {
148            let body_text = String::from_utf8_lossy(&response.body).to_string();
149            return Err(SdkError::Internal(format!(
150                "HTTP request failed with status {}: {}",
151                response.status, body_text
152            )));
153        }
154
155        let result: R = response.into_json().map_err(|e| {
156            SdkError::UnexpectedResponse(format!("Failed to parse response: {}", e))
157        })?;
158
159        Ok(result)
160    }
161
162    /// GET from an endpoint and deserialize the response.
163    fn get<R: for<'de> Deserialize<'de>>(&self, url: &str) -> Result<R> {
164        let response = self
165            .client
166            .request("GET", url)
167            .header("X-Runtara-Tenant-Id", &self.tenant_id)
168            .header("X-Runtara-Instance-Id", &self.instance_id)
169            .call()
170            .map_err(|e| SdkError::Internal(format!("HTTP request failed: {}", e)))?;
171
172        if response.status >= 400 {
173            let body_text = String::from_utf8_lossy(&response.body).to_string();
174            return Err(SdkError::Internal(format!(
175                "HTTP request failed with status {}: {}",
176                response.status, body_text
177            )));
178        }
179
180        let result: R = response.into_json().map_err(|e| {
181            SdkError::UnexpectedResponse(format!("Failed to parse response: {}", e))
182        })?;
183
184        Ok(result)
185    }
186
187    /// POST JSON fire-and-forget (ignore response body, just check status).
188    fn post_fire_and_forget<T: Serialize>(&self, url: &str, body: &T) -> Result<()> {
189        let json_value = serde_json::to_value(body)
190            .map_err(|e| SdkError::Internal(format!("Failed to serialize request body: {}", e)))?;
191
192        match self
193            .client
194            .request("POST", url)
195            .header("Content-Type", "application/json")
196            .header("X-Runtara-Tenant-Id", &self.tenant_id)
197            .header("X-Runtara-Instance-Id", &self.instance_id)
198            .body_json(&json_value)
199            .call()
200        {
201            Ok(_) => {}
202            Err(e) => {
203                warn!("Fire-and-forget request failed: {}", e);
204            }
205        }
206
207        Ok(())
208    }
209}
210
211// ============================================================================
212// JSON types for HTTP API communication
213// ============================================================================
214
215#[derive(Serialize)]
216struct RegisterBody {
217    tenant_id: String,
218    #[serde(skip_serializing_if = "Option::is_none")]
219    checkpoint_id: Option<String>,
220}
221
222#[derive(Deserialize)]
223struct RegisterResp {
224    success: bool,
225    #[serde(default)]
226    error: Option<String>,
227}
228
229#[derive(Serialize)]
230struct CheckpointBody {
231    checkpoint_id: String,
232    state: String, // base64
233}
234
235#[derive(Deserialize)]
236struct CheckpointResp {
237    found: bool,
238    #[serde(default)]
239    state: Option<String>, // base64
240    #[serde(default)]
241    signal: Option<SignalResp>,
242    #[serde(default)]
243    custom_signal: Option<CustomSignalResp>,
244}
245
246#[derive(Deserialize)]
247struct SignalResp {
248    signal_type: String,
249    #[serde(default)]
250    payload: Option<String>, // base64
251}
252
253#[derive(Deserialize)]
254struct CustomSignalResp {
255    checkpoint_id: String,
256    #[serde(default)]
257    payload: Option<String>, // base64
258}
259
260#[derive(Deserialize)]
261struct PollSignalsResp {
262    #[serde(default)]
263    signal: Option<SignalResp>,
264    #[serde(default)]
265    custom_signal: Option<CustomSignalResp>,
266}
267
268#[derive(Serialize)]
269struct EventBody {
270    event_type: String,
271    #[serde(skip_serializing_if = "Option::is_none")]
272    checkpoint_id: Option<String>,
273    #[serde(skip_serializing_if = "Option::is_none")]
274    payload: Option<String>, // base64
275    #[serde(skip_serializing_if = "Option::is_none")]
276    subtype: Option<String>,
277}
278
279#[derive(Serialize)]
280struct SleepBody {
281    duration_ms: u64,
282    checkpoint_id: String,
283    state: String, // base64
284}
285
286#[derive(Serialize)]
287struct SignalAckBody {
288    signal_type: String,
289}
290
291#[derive(Serialize)]
292struct RetryBody {
293    checkpoint_id: String,
294    attempt: u32,
295    #[serde(skip_serializing_if = "Option::is_none")]
296    error_message: Option<String>,
297}
298
299#[derive(Deserialize)]
300struct SuccessResp {
301    success: bool,
302}
303
304#[derive(Deserialize)]
305struct StatusResp {
306    found: bool,
307    #[serde(default)]
308    status: String,
309    #[serde(default)]
310    checkpoint_id: Option<String>,
311    #[serde(default)]
312    output: Option<String>, // base64
313    #[serde(default)]
314    error: Option<String>,
315}
316
317#[derive(Deserialize)]
318struct InputResp {
319    #[serde(default)]
320    input: Option<String>, // base64
321}
322
323// ============================================================================
324// Helper: convert signal types
325// ============================================================================
326
327fn parse_instance_status(s: &str) -> InstanceStatus {
328    match s {
329        "pending" => InstanceStatus::Pending,
330        "running" => InstanceStatus::Running,
331        "suspended" => InstanceStatus::Suspended,
332        "completed" => InstanceStatus::Completed,
333        "failed" => InstanceStatus::Failed,
334        _ => InstanceStatus::Unknown,
335    }
336}
337
338fn parse_signal_type(s: &str) -> SignalType {
339    match s {
340        "cancel" => SignalType::Cancel,
341        "pause" => SignalType::Pause,
342        "resume" => SignalType::Resume,
343        _ => SignalType::Cancel, // safe default
344    }
345}
346
347fn signal_type_str(st: &SignalType) -> &'static str {
348    match st {
349        SignalType::Cancel => "cancel",
350        SignalType::Pause => "pause",
351        SignalType::Resume => "resume",
352    }
353}
354
355/// Percent-encode a string for use in a URL path segment.
356/// Encodes characters that are not allowed in path segments (e.g., `/`, `:`, `?`, `#`).
357fn encode_url_path(s: &str) -> String {
358    use percent_encoding::{AsciiSet, CONTROLS, utf8_percent_encode};
359    // Encode everything that's not unreserved per RFC 3986, plus `/` and `:`
360    const PATH_SEGMENT: &AsciiSet = &CONTROLS
361        .add(b' ')
362        .add(b'"')
363        .add(b'#')
364        .add(b'%')
365        .add(b'/')
366        .add(b':')
367        .add(b'<')
368        .add(b'>')
369        .add(b'?')
370        .add(b'@')
371        .add(b'[')
372        .add(b']')
373        .add(b'^')
374        .add(b'{')
375        .add(b'|')
376        .add(b'}');
377    utf8_percent_encode(s, PATH_SEGMENT).to_string()
378}
379
380fn decode_b64(s: &str) -> Vec<u8> {
381    base64::engine::general_purpose::STANDARD
382        .decode(s)
383        .unwrap_or_default()
384}
385
386fn encode_b64(data: &[u8]) -> String {
387    base64::engine::general_purpose::STANDARD.encode(data)
388}
389
390fn parse_signal(resp: &SignalResp) -> Signal {
391    Signal {
392        signal_type: parse_signal_type(&resp.signal_type),
393        payload: resp.payload.as_deref().map(decode_b64).unwrap_or_default(),
394        checkpoint_id: None,
395    }
396}
397
398fn parse_custom_signal(resp: &CustomSignalResp) -> CustomSignal {
399    CustomSignal {
400        checkpoint_id: resp.checkpoint_id.clone(),
401        payload: resp.payload.as_deref().map(decode_b64).unwrap_or_default(),
402    }
403}
404
405// ============================================================================
406// SdkBackend implementation
407// ============================================================================
408
409impl SdkBackend for HttpBackend {
410    fn connect(&self) -> Result<()> {
411        // HTTP is connectionless — verify reachability with a health check
412        let url = format!("{}/health", self.base_url);
413        let resp = self.client.request("GET", &url).call().map_err(|e| {
414            SdkError::Internal(format!("Cannot reach runtara-core HTTP API: {}", e))
415        })?;
416
417        if resp.status >= 200 && resp.status < 300 {
418            self.connected.store(true, Ordering::SeqCst);
419            info!(base_url = %self.base_url, "Connected to runtara-core HTTP API");
420            Ok(())
421        } else {
422            Err(SdkError::Config(format!(
423                "Health check returned {}",
424                resp.status
425            )))
426        }
427    }
428
429    fn is_connected(&self) -> bool {
430        self.connected.load(Ordering::SeqCst)
431    }
432
433    fn close(&self) {
434        self.connected.store(false, Ordering::SeqCst);
435        debug!("HTTP backend closed");
436    }
437
438    fn register(&self, checkpoint_id: Option<&str>) -> Result<()> {
439        let body = RegisterBody {
440            tenant_id: self.tenant_id.clone(),
441            checkpoint_id: checkpoint_id.map(|s| s.to_string()),
442        };
443
444        let resp: RegisterResp = self.post(&self.url("register"), &body)?;
445
446        if resp.success {
447            info!("Instance registered via HTTP");
448            Ok(())
449        } else {
450            Err(SdkError::UnexpectedResponse(format!(
451                "Registration failed: {}",
452                resp.error.unwrap_or_default()
453            )))
454        }
455    }
456
457    fn instance_id(&self) -> &str {
458        &self.instance_id
459    }
460
461    fn tenant_id(&self) -> &str {
462        &self.tenant_id
463    }
464
465    fn checkpoint(&self, checkpoint_id: &str, state: &[u8]) -> Result<CheckpointResult> {
466        let body = CheckpointBody {
467            checkpoint_id: checkpoint_id.to_string(),
468            state: encode_b64(state),
469        };
470
471        let resp: CheckpointResp = self.post(&self.url("checkpoint"), &body)?;
472
473        Ok(CheckpointResult {
474            found: resp.found,
475            state: resp.state.as_deref().map(decode_b64).unwrap_or_default(),
476            pending_signal: resp.signal.as_ref().map(parse_signal),
477            custom_signal: resp.custom_signal.as_ref().map(parse_custom_signal),
478        })
479    }
480
481    fn get_checkpoint(&self, checkpoint_id: &str) -> Result<Option<Vec<u8>>> {
482        // Use checkpoint endpoint with empty state to check if exists
483        // The HTTP API's checkpoint endpoint handles this: if checkpoint exists, returns it
484        let body = CheckpointBody {
485            checkpoint_id: checkpoint_id.to_string(),
486            state: encode_b64(&[]),
487        };
488
489        let resp: CheckpointResp = self.post(&self.url("checkpoint"), &body)?;
490
491        if resp.found {
492            Ok(Some(
493                resp.state.as_deref().map(decode_b64).unwrap_or_default(),
494            ))
495        } else {
496            Ok(None)
497        }
498    }
499
500    fn heartbeat(&self) -> Result<()> {
501        let body = EventBody {
502            event_type: "heartbeat".to_string(),
503            checkpoint_id: None,
504            payload: None,
505            subtype: None,
506        };
507
508        self.post_fire_and_forget(&self.url("events"), &body)
509    }
510
511    fn completed(&self, output: &[u8]) -> Result<()> {
512        let body = serde_json::json!({ "output": encode_b64(output) });
513        let resp: SuccessResp = self.post(&self.url("completed"), &body)?;
514
515        if resp.success {
516            Ok(())
517        } else {
518            Err(SdkError::UnexpectedResponse(
519                "Failed to report completion".into(),
520            ))
521        }
522    }
523
524    fn failed(&self, error: &str) -> Result<()> {
525        let body = serde_json::json!({ "error": error });
526        let resp: SuccessResp = self.post(&self.url("failed"), &body)?;
527
528        if resp.success {
529            Ok(())
530        } else {
531            Err(SdkError::UnexpectedResponse(
532                "Failed to report failure".into(),
533            ))
534        }
535    }
536
537    fn suspended(&self) -> Result<()> {
538        let resp: SuccessResp = self.post(&self.url("suspended"), &serde_json::json!({}))?;
539
540        if resp.success {
541            Ok(())
542        } else {
543            Err(SdkError::UnexpectedResponse(
544                "Failed to report suspension".into(),
545            ))
546        }
547    }
548
549    fn sleep_until(&self, checkpoint_id: &str, wake_at: DateTime<Utc>, state: &[u8]) -> Result<()> {
550        let now = Utc::now();
551        let duration_ms = if wake_at > now {
552            (wake_at - now).num_milliseconds() as u64
553        } else {
554            0
555        };
556
557        self.durable_sleep(Duration::from_millis(duration_ms), checkpoint_id, state)
558    }
559
560    fn durable_sleep(&self, duration: Duration, checkpoint_id: &str, state: &[u8]) -> Result<()> {
561        let body = SleepBody {
562            duration_ms: duration.as_millis() as u64,
563            checkpoint_id: checkpoint_id.to_string(),
564            state: encode_b64(state),
565        };
566
567        let resp: SuccessResp = self.post(&self.url("sleep"), &body)?;
568
569        if resp.success {
570            Ok(())
571        } else {
572            Err(SdkError::UnexpectedResponse(
573                "Durable sleep request failed".into(),
574            ))
575        }
576    }
577
578    fn set_sleep_until(&self, _sleep_until: DateTime<Utc>) -> Result<()> {
579        // Server-side managed — no-op for HTTP backend
580        Ok(())
581    }
582
583    fn clear_sleep(&self) -> Result<()> {
584        // Server-side managed — no-op for HTTP backend
585        Ok(())
586    }
587
588    fn get_sleep_until(&self) -> Result<Option<DateTime<Utc>>> {
589        // Would need a separate endpoint; not currently needed by SDK
590        Ok(None)
591    }
592
593    fn send_custom_event(&self, subtype: &str, payload: Vec<u8>) -> Result<()> {
594        let body = EventBody {
595            event_type: "custom".to_string(),
596            checkpoint_id: None,
597            payload: Some(encode_b64(&payload)),
598            subtype: Some(subtype.to_string()),
599        };
600
601        let resp: SuccessResp = self.post(&self.url("events"), &body)?;
602
603        if resp.success {
604            Ok(())
605        } else {
606            Err(SdkError::UnexpectedResponse("Custom event failed".into()))
607        }
608    }
609
610    fn record_retry_attempt(
611        &self,
612        checkpoint_id: &str,
613        attempt_number: u32,
614        error_message: Option<&str>,
615    ) -> Result<()> {
616        let body = RetryBody {
617            checkpoint_id: checkpoint_id.to_string(),
618            attempt: attempt_number,
619            error_message: error_message.map(|s| s.to_string()),
620        };
621
622        self.post_fire_and_forget(&self.url("retry"), &body)
623    }
624
625    fn get_status(&self) -> Result<StatusResponse> {
626        self.get_instance_status(&self.instance_id)
627    }
628
629    fn poll_signals(
630        &self,
631        checkpoint_id: Option<&str>,
632    ) -> Result<(Option<Signal>, Option<CustomSignal>)> {
633        let url = match checkpoint_id {
634            Some(cp_id) => format!(
635                "{}/api/v1/instances/{}/signals/{}",
636                self.base_url,
637                self.instance_id,
638                encode_url_path(cp_id)
639            ),
640            None => format!(
641                "{}/api/v1/instances/{}/signals",
642                self.base_url, self.instance_id
643            ),
644        };
645
646        let resp: PollSignalsResp = self.get(&url)?;
647        let signal = resp.signal.as_ref().map(parse_signal);
648        let custom = resp.custom_signal.as_ref().map(parse_custom_signal);
649        Ok((signal, custom))
650    }
651
652    fn acknowledge_signal(&self, signal_type: SignalType) -> Result<()> {
653        let body = SignalAckBody {
654            signal_type: signal_type_str(&signal_type).to_string(),
655        };
656
657        let _: SuccessResp = self.post(&self.url("signals/ack"), &body)?;
658        Ok(())
659    }
660
661    fn get_instance_status(&self, instance_id: &str) -> Result<StatusResponse> {
662        let url = format!("{}/api/v1/instances/{}/status", self.base_url, instance_id);
663
664        let resp: StatusResp = self.get(&url)?;
665
666        Ok(StatusResponse {
667            found: resp.found,
668            status: parse_instance_status(&resp.status),
669            checkpoint_id: resp.checkpoint_id,
670            output: resp.output.as_deref().map(decode_b64),
671            error: resp.error,
672        })
673    }
674
675    fn load_input(&self) -> Result<Option<Vec<u8>>> {
676        let url = format!(
677            "{}/api/v1/instances/{}/input",
678            self.base_url, self.instance_id
679        );
680
681        let resp: InputResp = self.get(&url)?;
682        Ok(resp.input.as_deref().map(decode_b64))
683    }
684}
685
686impl std::fmt::Debug for HttpBackend {
687    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
688        f.debug_struct("HttpBackend")
689            .field("instance_id", &self.instance_id)
690            .field("tenant_id", &self.tenant_id)
691            .field("base_url", &self.base_url)
692            .field("connected", &self.connected.load(Ordering::SeqCst))
693            .finish()
694    }
695}