1use std::sync::atomic::{AtomicBool, Ordering};
13use std::time::Duration;
14
15use base64::Engine;
16use chrono::{DateTime, Utc};
17use serde::{Deserialize, Serialize};
18use tracing::{debug, info, warn};
19
20use crate::backend::SdkBackend;
21use crate::error::{Result, SdkError};
22use crate::types::{
23 CheckpointResult, CustomSignal, InstanceStatus, Signal, SignalType, StatusResponse,
24};
25
26#[derive(Debug, Clone)]
28pub struct HttpSdkConfig {
29 pub instance_id: String,
31 pub tenant_id: String,
33 pub base_url: String,
35 pub request_timeout_ms: u64,
37 pub signal_poll_interval_ms: u64,
39 pub heartbeat_interval_ms: u64,
41}
42
43impl HttpSdkConfig {
44 pub fn from_env() -> Result<Self> {
48 let instance_id = std::env::var("RUNTARA_INSTANCE_ID")
49 .map_err(|_| SdkError::Config("RUNTARA_INSTANCE_ID not set".into()))?;
50 let tenant_id = std::env::var("RUNTARA_TENANT_ID")
51 .map_err(|_| SdkError::Config("RUNTARA_TENANT_ID not set".into()))?;
52
53 let base_url = if let Ok(url) = std::env::var("RUNTARA_HTTP_URL") {
55 url
56 } else if let Ok(addr) = std::env::var("RUNTARA_SERVER_ADDR") {
57 let parts: Vec<&str> = addr.split(':').collect();
59 let host = parts.first().unwrap_or(&"127.0.0.1");
60 let base_port: u16 = parts.get(1).and_then(|p| p.parse().ok()).unwrap_or(8001);
61 let http_port = std::env::var("RUNTARA_CORE_HTTP_PORT")
62 .ok()
63 .and_then(|p| p.parse().ok())
64 .unwrap_or(base_port + 2); format!("http://{}:{}", host, http_port)
66 } else {
67 "http://127.0.0.1:8003".to_string()
68 };
69
70 let request_timeout_ms = std::env::var("RUNTARA_REQUEST_TIMEOUT_MS")
71 .ok()
72 .and_then(|s| s.parse().ok())
73 .unwrap_or(30_000);
74
75 let signal_poll_interval_ms = std::env::var("RUNTARA_SIGNAL_POLL_INTERVAL_MS")
76 .ok()
77 .and_then(|s| s.parse().ok())
78 .unwrap_or(1_000);
79
80 let heartbeat_interval_ms = std::env::var("RUNTARA_HEARTBEAT_INTERVAL_MS")
81 .ok()
82 .and_then(|s| s.parse().ok())
83 .unwrap_or(30_000);
84
85 Ok(Self {
86 instance_id,
87 tenant_id,
88 base_url,
89 request_timeout_ms,
90 signal_poll_interval_ms,
91 heartbeat_interval_ms,
92 })
93 }
94}
95
96pub struct HttpBackend {
101 instance_id: String,
102 tenant_id: String,
103 base_url: String,
104 client: runtara_http::HttpClient,
105 connected: AtomicBool,
106}
107
108impl HttpBackend {
109 pub fn new(config: &HttpSdkConfig) -> Result<Self> {
111 let client = runtara_http::HttpClient::with_timeout(Duration::from_millis(
112 config.request_timeout_ms,
113 ));
114
115 Ok(Self {
116 instance_id: config.instance_id.clone(),
117 tenant_id: config.tenant_id.clone(),
118 base_url: config.base_url.trim_end_matches('/').to_string(),
119 client,
120 connected: AtomicBool::new(false),
121 })
122 }
123
124 fn url(&self, path: &str) -> String {
126 format!(
127 "{}/api/v1/instances/{}/{}",
128 self.base_url, self.instance_id, path
129 )
130 }
131
132 fn post<T: Serialize, R: for<'de> Deserialize<'de>>(&self, url: &str, body: &T) -> Result<R> {
134 let json_value = serde_json::to_value(body)
135 .map_err(|e| SdkError::Internal(format!("Failed to serialize request body: {}", e)))?;
136
137 let response = self
138 .client
139 .request("POST", url)
140 .header("Content-Type", "application/json")
141 .header("X-Runtara-Tenant-Id", &self.tenant_id)
142 .header("X-Runtara-Instance-Id", &self.instance_id)
143 .body_json(&json_value)
144 .call()
145 .map_err(|e| SdkError::Internal(format!("HTTP request failed: {}", e)))?;
146
147 if response.status >= 400 {
148 let body_text = String::from_utf8_lossy(&response.body).to_string();
149 return Err(SdkError::Internal(format!(
150 "HTTP request failed with status {}: {}",
151 response.status, body_text
152 )));
153 }
154
155 let result: R = response.into_json().map_err(|e| {
156 SdkError::UnexpectedResponse(format!("Failed to parse response: {}", e))
157 })?;
158
159 Ok(result)
160 }
161
162 fn get<R: for<'de> Deserialize<'de>>(&self, url: &str) -> Result<R> {
164 let response = self
165 .client
166 .request("GET", url)
167 .header("X-Runtara-Tenant-Id", &self.tenant_id)
168 .header("X-Runtara-Instance-Id", &self.instance_id)
169 .call()
170 .map_err(|e| SdkError::Internal(format!("HTTP request failed: {}", e)))?;
171
172 if response.status >= 400 {
173 let body_text = String::from_utf8_lossy(&response.body).to_string();
174 return Err(SdkError::Internal(format!(
175 "HTTP request failed with status {}: {}",
176 response.status, body_text
177 )));
178 }
179
180 let result: R = response.into_json().map_err(|e| {
181 SdkError::UnexpectedResponse(format!("Failed to parse response: {}", e))
182 })?;
183
184 Ok(result)
185 }
186
187 fn post_fire_and_forget<T: Serialize>(&self, url: &str, body: &T) -> Result<()> {
189 let json_value = serde_json::to_value(body)
190 .map_err(|e| SdkError::Internal(format!("Failed to serialize request body: {}", e)))?;
191
192 match self
193 .client
194 .request("POST", url)
195 .header("Content-Type", "application/json")
196 .header("X-Runtara-Tenant-Id", &self.tenant_id)
197 .header("X-Runtara-Instance-Id", &self.instance_id)
198 .body_json(&json_value)
199 .call()
200 {
201 Ok(_) => {}
202 Err(e) => {
203 warn!("Fire-and-forget request failed: {}", e);
204 }
205 }
206
207 Ok(())
208 }
209}
210
211#[derive(Serialize)]
216struct RegisterBody {
217 tenant_id: String,
218 #[serde(skip_serializing_if = "Option::is_none")]
219 checkpoint_id: Option<String>,
220}
221
222#[derive(Deserialize)]
223struct RegisterResp {
224 success: bool,
225 #[serde(default)]
226 error: Option<String>,
227}
228
229#[derive(Serialize)]
230struct CheckpointBody {
231 checkpoint_id: String,
232 state: String, }
234
235#[derive(Deserialize)]
236struct CheckpointResp {
237 found: bool,
238 #[serde(default)]
239 state: Option<String>, #[serde(default)]
241 signal: Option<SignalResp>,
242 #[serde(default)]
243 custom_signal: Option<CustomSignalResp>,
244}
245
246#[derive(Deserialize)]
247struct SignalResp {
248 signal_type: String,
249 #[serde(default)]
250 payload: Option<String>, }
252
253#[derive(Deserialize)]
254struct CustomSignalResp {
255 checkpoint_id: String,
256 #[serde(default)]
257 payload: Option<String>, }
259
260#[derive(Deserialize)]
261struct PollSignalsResp {
262 #[serde(default)]
263 signal: Option<SignalResp>,
264 #[serde(default)]
265 custom_signal: Option<CustomSignalResp>,
266}
267
268#[derive(Serialize)]
269struct EventBody {
270 event_type: String,
271 #[serde(skip_serializing_if = "Option::is_none")]
272 checkpoint_id: Option<String>,
273 #[serde(skip_serializing_if = "Option::is_none")]
274 payload: Option<String>, #[serde(skip_serializing_if = "Option::is_none")]
276 subtype: Option<String>,
277}
278
279#[derive(Serialize)]
280struct SleepBody {
281 duration_ms: u64,
282 checkpoint_id: String,
283 state: String, }
285
286#[derive(Serialize)]
287struct SignalAckBody {
288 signal_type: String,
289}
290
291#[derive(Serialize)]
292struct RetryBody {
293 checkpoint_id: String,
294 attempt: u32,
295 #[serde(skip_serializing_if = "Option::is_none")]
296 error_message: Option<String>,
297}
298
299#[derive(Deserialize)]
300struct SuccessResp {
301 success: bool,
302}
303
304#[derive(Deserialize)]
305struct StatusResp {
306 found: bool,
307 #[serde(default)]
308 status: String,
309 #[serde(default)]
310 checkpoint_id: Option<String>,
311 #[serde(default)]
312 output: Option<String>, #[serde(default)]
314 error: Option<String>,
315}
316
317#[derive(Deserialize)]
318struct InputResp {
319 #[serde(default)]
320 input: Option<String>, }
322
323fn parse_instance_status(s: &str) -> InstanceStatus {
328 match s {
329 "pending" => InstanceStatus::Pending,
330 "running" => InstanceStatus::Running,
331 "suspended" => InstanceStatus::Suspended,
332 "completed" => InstanceStatus::Completed,
333 "failed" => InstanceStatus::Failed,
334 _ => InstanceStatus::Unknown,
335 }
336}
337
338fn parse_signal_type(s: &str) -> SignalType {
339 match s {
340 "cancel" => SignalType::Cancel,
341 "pause" => SignalType::Pause,
342 "resume" => SignalType::Resume,
343 _ => SignalType::Cancel, }
345}
346
347fn signal_type_str(st: &SignalType) -> &'static str {
348 match st {
349 SignalType::Cancel => "cancel",
350 SignalType::Pause => "pause",
351 SignalType::Resume => "resume",
352 }
353}
354
355fn encode_url_path(s: &str) -> String {
358 use percent_encoding::{AsciiSet, CONTROLS, utf8_percent_encode};
359 const PATH_SEGMENT: &AsciiSet = &CONTROLS
361 .add(b' ')
362 .add(b'"')
363 .add(b'#')
364 .add(b'%')
365 .add(b'/')
366 .add(b':')
367 .add(b'<')
368 .add(b'>')
369 .add(b'?')
370 .add(b'@')
371 .add(b'[')
372 .add(b']')
373 .add(b'^')
374 .add(b'{')
375 .add(b'|')
376 .add(b'}');
377 utf8_percent_encode(s, PATH_SEGMENT).to_string()
378}
379
380fn decode_b64(s: &str) -> Vec<u8> {
381 base64::engine::general_purpose::STANDARD
382 .decode(s)
383 .unwrap_or_default()
384}
385
386fn encode_b64(data: &[u8]) -> String {
387 base64::engine::general_purpose::STANDARD.encode(data)
388}
389
390fn parse_signal(resp: &SignalResp) -> Signal {
391 Signal {
392 signal_type: parse_signal_type(&resp.signal_type),
393 payload: resp.payload.as_deref().map(decode_b64).unwrap_or_default(),
394 checkpoint_id: None,
395 }
396}
397
398fn parse_custom_signal(resp: &CustomSignalResp) -> CustomSignal {
399 CustomSignal {
400 checkpoint_id: resp.checkpoint_id.clone(),
401 payload: resp.payload.as_deref().map(decode_b64).unwrap_or_default(),
402 }
403}
404
405impl SdkBackend for HttpBackend {
410 fn connect(&self) -> Result<()> {
411 let url = format!("{}/health", self.base_url);
413 let resp = self.client.request("GET", &url).call().map_err(|e| {
414 SdkError::Internal(format!("Cannot reach runtara-core HTTP API: {}", e))
415 })?;
416
417 if resp.status >= 200 && resp.status < 300 {
418 self.connected.store(true, Ordering::SeqCst);
419 info!(base_url = %self.base_url, "Connected to runtara-core HTTP API");
420 Ok(())
421 } else {
422 Err(SdkError::Config(format!(
423 "Health check returned {}",
424 resp.status
425 )))
426 }
427 }
428
429 fn is_connected(&self) -> bool {
430 self.connected.load(Ordering::SeqCst)
431 }
432
433 fn close(&self) {
434 self.connected.store(false, Ordering::SeqCst);
435 debug!("HTTP backend closed");
436 }
437
438 fn register(&self, checkpoint_id: Option<&str>) -> Result<()> {
439 let body = RegisterBody {
440 tenant_id: self.tenant_id.clone(),
441 checkpoint_id: checkpoint_id.map(|s| s.to_string()),
442 };
443
444 let resp: RegisterResp = self.post(&self.url("register"), &body)?;
445
446 if resp.success {
447 info!("Instance registered via HTTP");
448 Ok(())
449 } else {
450 Err(SdkError::UnexpectedResponse(format!(
451 "Registration failed: {}",
452 resp.error.unwrap_or_default()
453 )))
454 }
455 }
456
457 fn instance_id(&self) -> &str {
458 &self.instance_id
459 }
460
461 fn tenant_id(&self) -> &str {
462 &self.tenant_id
463 }
464
465 fn checkpoint(&self, checkpoint_id: &str, state: &[u8]) -> Result<CheckpointResult> {
466 let body = CheckpointBody {
467 checkpoint_id: checkpoint_id.to_string(),
468 state: encode_b64(state),
469 };
470
471 let resp: CheckpointResp = self.post(&self.url("checkpoint"), &body)?;
472
473 Ok(CheckpointResult {
474 found: resp.found,
475 state: resp.state.as_deref().map(decode_b64).unwrap_or_default(),
476 pending_signal: resp.signal.as_ref().map(parse_signal),
477 custom_signal: resp.custom_signal.as_ref().map(parse_custom_signal),
478 })
479 }
480
481 fn get_checkpoint(&self, checkpoint_id: &str) -> Result<Option<Vec<u8>>> {
482 let body = CheckpointBody {
485 checkpoint_id: checkpoint_id.to_string(),
486 state: encode_b64(&[]),
487 };
488
489 let resp: CheckpointResp = self.post(&self.url("checkpoint"), &body)?;
490
491 if resp.found {
492 Ok(Some(
493 resp.state.as_deref().map(decode_b64).unwrap_or_default(),
494 ))
495 } else {
496 Ok(None)
497 }
498 }
499
500 fn heartbeat(&self) -> Result<()> {
501 let body = EventBody {
502 event_type: "heartbeat".to_string(),
503 checkpoint_id: None,
504 payload: None,
505 subtype: None,
506 };
507
508 self.post_fire_and_forget(&self.url("events"), &body)
509 }
510
511 fn completed(&self, output: &[u8]) -> Result<()> {
512 let body = serde_json::json!({ "output": encode_b64(output) });
513 let resp: SuccessResp = self.post(&self.url("completed"), &body)?;
514
515 if resp.success {
516 Ok(())
517 } else {
518 Err(SdkError::UnexpectedResponse(
519 "Failed to report completion".into(),
520 ))
521 }
522 }
523
524 fn failed(&self, error: &str) -> Result<()> {
525 let body = serde_json::json!({ "error": error });
526 let resp: SuccessResp = self.post(&self.url("failed"), &body)?;
527
528 if resp.success {
529 Ok(())
530 } else {
531 Err(SdkError::UnexpectedResponse(
532 "Failed to report failure".into(),
533 ))
534 }
535 }
536
537 fn suspended(&self) -> Result<()> {
538 let resp: SuccessResp = self.post(&self.url("suspended"), &serde_json::json!({}))?;
539
540 if resp.success {
541 Ok(())
542 } else {
543 Err(SdkError::UnexpectedResponse(
544 "Failed to report suspension".into(),
545 ))
546 }
547 }
548
549 fn sleep_until(&self, checkpoint_id: &str, wake_at: DateTime<Utc>, state: &[u8]) -> Result<()> {
550 let now = Utc::now();
551 let duration_ms = if wake_at > now {
552 (wake_at - now).num_milliseconds() as u64
553 } else {
554 0
555 };
556
557 self.durable_sleep(Duration::from_millis(duration_ms), checkpoint_id, state)
558 }
559
560 fn durable_sleep(&self, duration: Duration, checkpoint_id: &str, state: &[u8]) -> Result<()> {
561 let body = SleepBody {
562 duration_ms: duration.as_millis() as u64,
563 checkpoint_id: checkpoint_id.to_string(),
564 state: encode_b64(state),
565 };
566
567 let resp: SuccessResp = self.post(&self.url("sleep"), &body)?;
568
569 if resp.success {
570 Ok(())
571 } else {
572 Err(SdkError::UnexpectedResponse(
573 "Durable sleep request failed".into(),
574 ))
575 }
576 }
577
578 fn set_sleep_until(&self, _sleep_until: DateTime<Utc>) -> Result<()> {
579 Ok(())
581 }
582
583 fn clear_sleep(&self) -> Result<()> {
584 Ok(())
586 }
587
588 fn get_sleep_until(&self) -> Result<Option<DateTime<Utc>>> {
589 Ok(None)
591 }
592
593 fn send_custom_event(&self, subtype: &str, payload: Vec<u8>) -> Result<()> {
594 let body = EventBody {
595 event_type: "custom".to_string(),
596 checkpoint_id: None,
597 payload: Some(encode_b64(&payload)),
598 subtype: Some(subtype.to_string()),
599 };
600
601 let resp: SuccessResp = self.post(&self.url("events"), &body)?;
602
603 if resp.success {
604 Ok(())
605 } else {
606 Err(SdkError::UnexpectedResponse("Custom event failed".into()))
607 }
608 }
609
610 fn record_retry_attempt(
611 &self,
612 checkpoint_id: &str,
613 attempt_number: u32,
614 error_message: Option<&str>,
615 ) -> Result<()> {
616 let body = RetryBody {
617 checkpoint_id: checkpoint_id.to_string(),
618 attempt: attempt_number,
619 error_message: error_message.map(|s| s.to_string()),
620 };
621
622 self.post_fire_and_forget(&self.url("retry"), &body)
623 }
624
625 fn get_status(&self) -> Result<StatusResponse> {
626 self.get_instance_status(&self.instance_id)
627 }
628
629 fn poll_signals(
630 &self,
631 checkpoint_id: Option<&str>,
632 ) -> Result<(Option<Signal>, Option<CustomSignal>)> {
633 let url = match checkpoint_id {
634 Some(cp_id) => format!(
635 "{}/api/v1/instances/{}/signals/{}",
636 self.base_url,
637 self.instance_id,
638 encode_url_path(cp_id)
639 ),
640 None => format!(
641 "{}/api/v1/instances/{}/signals",
642 self.base_url, self.instance_id
643 ),
644 };
645
646 let resp: PollSignalsResp = self.get(&url)?;
647 let signal = resp.signal.as_ref().map(parse_signal);
648 let custom = resp.custom_signal.as_ref().map(parse_custom_signal);
649 Ok((signal, custom))
650 }
651
652 fn acknowledge_signal(&self, signal_type: SignalType) -> Result<()> {
653 let body = SignalAckBody {
654 signal_type: signal_type_str(&signal_type).to_string(),
655 };
656
657 let _: SuccessResp = self.post(&self.url("signals/ack"), &body)?;
658 Ok(())
659 }
660
661 fn get_instance_status(&self, instance_id: &str) -> Result<StatusResponse> {
662 let url = format!("{}/api/v1/instances/{}/status", self.base_url, instance_id);
663
664 let resp: StatusResp = self.get(&url)?;
665
666 Ok(StatusResponse {
667 found: resp.found,
668 status: parse_instance_status(&resp.status),
669 checkpoint_id: resp.checkpoint_id,
670 output: resp.output.as_deref().map(decode_b64),
671 error: resp.error,
672 })
673 }
674
675 fn load_input(&self) -> Result<Option<Vec<u8>>> {
676 let url = format!(
677 "{}/api/v1/instances/{}/input",
678 self.base_url, self.instance_id
679 );
680
681 let resp: InputResp = self.get(&url)?;
682 Ok(resp.input.as_deref().map(decode_b64))
683 }
684}
685
686impl std::fmt::Debug for HttpBackend {
687 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
688 f.debug_struct("HttpBackend")
689 .field("instance_id", &self.instance_id)
690 .field("tenant_id", &self.tenant_id)
691 .field("base_url", &self.base_url)
692 .field("connected", &self.connected.load(Ordering::SeqCst))
693 .finish()
694 }
695}