use std::time::Duration;
use std::time::Instant;
use tracing::{debug, info, instrument};
use crate::backend::SdkBackend;
use crate::error::{Result, SdkError};
use crate::types::{CheckpointResult, Signal, SignalType, StatusResponse};
pub struct RuntaraSdk {
backend: std::sync::Arc<dyn SdkBackend>,
registered: bool,
last_signal_poll: Instant,
pending_signal: Option<Signal>,
signal_poll_interval_ms: u64,
heartbeat_interval_ms: u64,
}
impl RuntaraSdk {
#[cfg(feature = "http")]
pub fn new(config: crate::backend::http::HttpSdkConfig) -> Result<Self> {
use crate::backend::http::HttpBackend;
let signal_poll_interval_ms = config.signal_poll_interval_ms;
let heartbeat_interval_ms = config.heartbeat_interval_ms;
let backend = HttpBackend::new(&config)?;
Ok(Self {
backend: std::sync::Arc::new(backend),
registered: false,
last_signal_poll: Instant::now()
.checked_sub(Duration::from_secs(60))
.unwrap_or_else(Instant::now),
pending_signal: None,
signal_poll_interval_ms,
heartbeat_interval_ms,
})
}
#[cfg(feature = "http")]
pub fn from_env() -> Result<Self> {
let config = crate::backend::http::HttpSdkConfig::from_env()?;
Self::new(config)
}
#[cfg(feature = "embedded")]
pub fn embedded(
persistence: std::sync::Arc<dyn runtara_core::persistence::Persistence>,
instance_id: impl Into<String>,
tenant_id: impl Into<String>,
) -> Self {
use crate::backend::embedded::EmbeddedBackend;
let backend = EmbeddedBackend::new(persistence, instance_id, tenant_id);
Self {
backend: std::sync::Arc::new(backend),
registered: false,
last_signal_poll: Instant::now()
.checked_sub(Duration::from_secs(60))
.unwrap_or_else(Instant::now),
pending_signal: None,
signal_poll_interval_ms: 1_000,
heartbeat_interval_ms: 30_000,
}
}
#[cfg(feature = "embedded")]
pub fn with_embedded_backend(
persistence: std::sync::Arc<dyn runtara_core::persistence::Persistence>,
instance_id: impl Into<String>,
tenant_id: impl Into<String>,
signal_poll_interval_ms: u64,
heartbeat_interval_ms: u64,
) -> Self {
use crate::backend::embedded::EmbeddedBackend;
let backend = EmbeddedBackend::new(persistence, instance_id, tenant_id);
Self {
backend: std::sync::Arc::new(backend),
registered: false,
last_signal_poll: Instant::now()
.checked_sub(Duration::from_secs(60))
.unwrap_or_else(Instant::now),
pending_signal: None,
signal_poll_interval_ms,
heartbeat_interval_ms,
}
}
#[instrument(skip(self), fields(instance_id = %self.backend.instance_id()))]
pub fn init(mut self, checkpoint_id: Option<&str>) -> Result<()> {
self.connect()?;
self.register(checkpoint_id)?;
crate::register_sdk(self);
info!("SDK initialized globally");
Ok(())
}
#[instrument(skip(self), fields(instance_id = %self.backend.instance_id()))]
pub fn connect(&self) -> Result<()> {
info!("Connecting to runtara-core");
self.backend.connect()?;
info!("Connected to runtara-core");
Ok(())
}
pub fn is_connected(&self) -> bool {
self.backend.is_connected()
}
pub fn close(&self) {
self.backend.close();
}
#[instrument(skip(self), fields(instance_id = %self.backend.instance_id()))]
pub fn register(&mut self, checkpoint_id: Option<&str>) -> Result<()> {
self.backend.register(checkpoint_id)?;
self.registered = true;
info!("Instance registered");
Ok(())
}
#[instrument(skip(self, state), fields(instance_id = %self.backend.instance_id(), checkpoint_id = %checkpoint_id, state_size = state.len()))]
pub fn checkpoint(&self, checkpoint_id: &str, state: &[u8]) -> Result<CheckpointResult> {
self.backend.checkpoint(checkpoint_id, state)
}
#[instrument(skip(self), fields(instance_id = %self.backend.instance_id(), checkpoint_id = %checkpoint_id))]
pub fn get_checkpoint(&self, checkpoint_id: &str) -> Result<Option<Vec<u8>>> {
self.backend.get_checkpoint(checkpoint_id)
}
#[instrument(skip(self, state), fields(instance_id = %self.backend.instance_id(), duration_ms = duration.as_millis() as u64))]
pub fn sleep(&self, duration: Duration, checkpoint_id: &str, state: &[u8]) -> Result<()> {
self.backend.durable_sleep(duration, checkpoint_id, state)
}
#[instrument(skip(self), fields(instance_id = %self.backend.instance_id()))]
pub fn heartbeat(&self) -> Result<()> {
self.backend.heartbeat()
}
pub fn load_input(&self) -> Result<Option<Vec<u8>>> {
self.backend.load_input()
}
#[instrument(skip(self, output), fields(instance_id = %self.backend.instance_id(), output_size = output.len()))]
pub fn completed(&self, output: &[u8]) -> Result<()> {
self.backend.completed(output)
}
#[instrument(skip(self), fields(instance_id = %self.backend.instance_id()))]
pub fn failed(&self, error: &str) -> Result<()> {
self.backend.failed(error)
}
#[instrument(skip(self), fields(instance_id = %self.backend.instance_id()))]
pub fn suspended(&self) -> Result<()> {
self.backend.suspended()
}
#[instrument(skip(self, state), fields(instance_id = %self.backend.instance_id(), checkpoint_id = %checkpoint_id))]
pub fn sleep_until(
&self,
checkpoint_id: &str,
wake_at: chrono::DateTime<chrono::Utc>,
state: &[u8],
) -> Result<()> {
self.backend.sleep_until(checkpoint_id, wake_at, state)
}
#[instrument(skip(self, payload), fields(instance_id = %self.backend.instance_id(), subtype = %subtype))]
pub fn custom_event(&self, subtype: &str, payload: Vec<u8>) -> Result<()> {
self.backend.send_custom_event(subtype, payload)
}
#[instrument(skip(self), fields(instance_id = %self.backend.instance_id()))]
pub fn poll_signal(&mut self) -> Result<Option<Signal>> {
if self.pending_signal.is_some() {
return Ok(self.pending_signal.take());
}
let poll_interval = Duration::from_millis(self.signal_poll_interval_ms);
if self.last_signal_poll.elapsed() < poll_interval {
return Ok(None);
}
self.poll_signal_now()
}
pub fn poll_signal_now(&mut self) -> Result<Option<Signal>> {
self.last_signal_poll = Instant::now();
let (signal, custom) = self.backend.poll_signals(None)?;
if let Some(sig) = signal {
debug!(signal_type = ?sig.signal_type, "Signal received");
return Ok(Some(sig));
}
if let Some(custom) = custom {
let sdk_signal = Signal {
signal_type: SignalType::Resume, payload: custom.payload,
checkpoint_id: Some(custom.checkpoint_id),
};
debug!("Custom signal received for checkpoint");
return Ok(Some(sdk_signal));
}
Ok(None)
}
#[instrument(skip(self), fields(instance_id = %self.backend.instance_id(), signal_id = %signal_id))]
pub fn poll_custom_signal(&mut self, signal_id: &str) -> Result<Option<Vec<u8>>> {
let (_signal, custom) = self.backend.poll_signals(Some(signal_id))?;
if let Some(custom) = custom {
debug!(signal_id = %signal_id, "Custom signal received");
return Ok(Some(custom.payload));
}
Ok(None)
}
#[instrument(skip(self), fields(instance_id = %self.backend.instance_id()))]
pub fn acknowledge_signal(&self, signal_type: SignalType) -> Result<()> {
self.backend.acknowledge_signal(signal_type)?;
debug!("Signal acknowledged");
Ok(())
}
pub fn check_cancelled(&mut self) -> Result<()> {
if let Some(signal) = self.poll_signal()? {
if signal.signal_type == SignalType::Cancel {
return Err(SdkError::Cancelled);
}
self.pending_signal = Some(signal);
}
Ok(())
}
pub fn check_paused(&mut self) -> Result<()> {
if let Some(signal) = self.poll_signal()? {
if signal.signal_type == SignalType::Pause {
return Err(SdkError::Paused);
}
self.pending_signal = Some(signal);
}
Ok(())
}
pub fn check_signals(&mut self) -> Result<()> {
if let Some(signal) = self.poll_signal()? {
match signal.signal_type {
SignalType::Cancel => return Err(SdkError::Cancelled),
SignalType::Pause => return Err(SdkError::Paused),
SignalType::Resume => {
self.pending_signal = Some(signal);
}
}
}
Ok(())
}
#[instrument(skip(self), fields(instance_id = %self.backend.instance_id(), checkpoint_id = %checkpoint_id, attempt = attempt_number))]
pub fn record_retry_attempt(
&self,
checkpoint_id: &str,
attempt_number: u32,
error_message: Option<&str>,
) -> Result<()> {
self.backend
.record_retry_attempt(checkpoint_id, attempt_number, error_message)
}
#[instrument(skip(self), fields(instance_id = %self.backend.instance_id()))]
pub fn get_status(&self) -> Result<StatusResponse> {
self.backend.get_status()
}
pub fn get_instance_status(&self, instance_id: &str) -> Result<StatusResponse> {
self.backend.get_instance_status(instance_id)
}
pub fn instance_id(&self) -> &str {
self.backend.instance_id()
}
pub fn tenant_id(&self) -> &str {
self.backend.tenant_id()
}
pub fn is_registered(&self) -> bool {
self.registered
}
pub fn heartbeat_interval_ms(&self) -> u64 {
self.heartbeat_interval_ms
}
}
#[cfg(test)]
mod tests {
use super::*;
#[cfg(feature = "http")]
#[test]
fn test_sdk_creation_http() {
let config = crate::backend::http::HttpSdkConfig {
instance_id: "test-instance".to_string(),
tenant_id: "test-tenant".to_string(),
base_url: "http://127.0.0.1:8003".to_string(),
request_timeout_ms: 30_000,
signal_poll_interval_ms: 1_000,
heartbeat_interval_ms: 30_000,
};
let sdk = RuntaraSdk::new(config).unwrap();
assert_eq!(sdk.instance_id(), "test-instance");
assert_eq!(sdk.tenant_id(), "test-tenant");
assert!(!sdk.is_registered());
}
#[cfg(feature = "http")]
#[test]
fn test_sdk_initial_state() {
let config = crate::backend::http::HttpSdkConfig {
instance_id: "test".to_string(),
tenant_id: "test".to_string(),
base_url: "http://127.0.0.1:8003".to_string(),
request_timeout_ms: 30_000,
signal_poll_interval_ms: 1_000,
heartbeat_interval_ms: 30_000,
};
let sdk = RuntaraSdk::new(config).unwrap();
assert!(!sdk.is_registered());
assert!(sdk.pending_signal.is_none());
}
}