use std::{cmp::Ordering, collections::VecDeque, time::Duration};
use crate::opensymphony_workflow::{Environment, ResolvedWorkflow};
use futures_util::StreamExt;
use reqwest::{
RequestBuilder,
header::{CONTENT_TYPE, HeaderMap, HeaderName, HeaderValue},
};
use serde::{Serialize, de::DeserializeOwned};
use serde_json::Value;
use tokio::{
net::TcpStream,
task::yield_now,
time::{Instant, sleep, timeout_at},
};
use tokio_tungstenite::{
MaybeTlsStream, WebSocketStream, connect_async,
tungstenite::{Message, client::IntoClientRequest},
};
use tracing::debug;
use url::{Host, Position, Url};
use uuid::Uuid;
use super::events::{ConversationStateMirror, EventCache, KnownEvent, TerminalExecutionStatus};
use super::models::{
AcceptedResponse, Conversation, ConversationCreateRequest, ConversationRunRequest,
EventEnvelope, SearchConversationEventsResponse, SendMessageRequest,
};
const SESSION_API_KEY_HEADER_NAME: &str = "x-session-api-key";
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum TransportTargetKind {
Loopback,
Remote,
}
impl TransportTargetKind {
pub fn as_str(self) -> &'static str {
match self {
Self::Loopback => "loopback",
Self::Remote => "remote",
}
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum TransportAuthKind {
None,
Header,
QueryParam,
}
impl TransportAuthKind {
pub fn as_str(self) -> &'static str {
match self {
Self::None => "none",
Self::Header => "header",
Self::QueryParam => "query_param",
}
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct TransportDiagnostics {
pub target_kind: TransportTargetKind,
pub http_auth_kind: TransportAuthKind,
pub websocket_auth_kind: TransportAuthKind,
pub websocket_query_param_name: Option<String>,
pub managed_local_server_candidate: bool,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
enum WorkflowWebSocketAuthMode {
Auto,
Header,
QueryParam,
}
impl WorkflowWebSocketAuthMode {
fn parse(mode: &str) -> Result<Self, OpenHandsError> {
match mode.trim().to_ascii_lowercase().as_str() {
"auto" => Ok(Self::Auto),
"header" => Ok(Self::Header),
"query_param" => Ok(Self::QueryParam),
other => Err(OpenHandsError::invalid_configuration(format!(
"unsupported websocket auth mode `{other}`"
))),
}
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct ApiKeyAuth {
name: String,
value: String,
}
impl ApiKeyAuth {
pub fn new(name: impl Into<String>, value: impl Into<String>) -> Self {
Self {
name: name.into(),
value: value.into(),
}
}
pub fn name(&self) -> &str {
&self.name
}
pub fn value(&self) -> &str {
&self.value
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum HttpAuth {
None,
QueryParam(ApiKeyAuth),
Header(ApiKeyAuth),
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum WebSocketAuth {
None,
QueryParam(ApiKeyAuth),
Header(ApiKeyAuth),
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct AuthConfig {
pub http: HttpAuth,
pub websocket: WebSocketAuth,
}
impl Default for AuthConfig {
fn default() -> Self {
Self::none()
}
}
impl AuthConfig {
pub fn none() -> Self {
Self {
http: HttpAuth::None,
websocket: WebSocketAuth::None,
}
}
pub fn query_param_api_key(name: impl Into<String>, value: impl Into<String>) -> Self {
let key = ApiKeyAuth::new(name, value);
Self {
http: HttpAuth::QueryParam(key.clone()),
websocket: WebSocketAuth::QueryParam(key),
}
}
pub fn header_api_key(name: impl Into<String>, value: impl Into<String>) -> Self {
let key = ApiKeyAuth::new(name, value);
Self {
http: HttpAuth::Header(key.clone()),
websocket: WebSocketAuth::Header(key),
}
}
pub fn header_api_key_with_websocket_query_fallback(
header_name: impl Into<String>,
websocket_query_param: impl Into<String>,
value: impl Into<String>,
) -> Self {
let value = value.into();
Self {
http: HttpAuth::Header(ApiKeyAuth::new(header_name, value.clone())),
websocket: WebSocketAuth::QueryParam(ApiKeyAuth::new(websocket_query_param, value)),
}
}
fn apply_http_query(&self, url: &mut Url) {
if let HttpAuth::QueryParam(key) = &self.http {
url.query_pairs_mut().append_pair(key.name(), key.value());
}
}
fn apply_websocket_query(&self, url: &mut Url) {
if let WebSocketAuth::QueryParam(key) = &self.websocket {
url.query_pairs_mut().append_pair(key.name(), key.value());
}
}
fn apply_http_headers(
&self,
request: RequestBuilder,
) -> Result<RequestBuilder, OpenHandsError> {
match &self.http {
HttpAuth::Header(key) => Ok(request.header(
parse_header_name(key.name())?,
parse_header_value(key.value())?,
)),
_ => Ok(request),
}
}
fn apply_websocket_headers(&self, headers: &mut HeaderMap) -> Result<(), OpenHandsError> {
if let WebSocketAuth::Header(key) = &self.websocket {
headers.insert(
parse_header_name(key.name())?,
parse_header_value(key.value())?,
);
}
Ok(())
}
fn http_auth_kind(&self) -> TransportAuthKind {
match self.http {
HttpAuth::None => TransportAuthKind::None,
HttpAuth::Header(_) => TransportAuthKind::Header,
HttpAuth::QueryParam(_) => TransportAuthKind::QueryParam,
}
}
fn websocket_auth_kind(&self) -> TransportAuthKind {
match self.websocket {
WebSocketAuth::None => TransportAuthKind::None,
WebSocketAuth::Header(_) => TransportAuthKind::Header,
WebSocketAuth::QueryParam(_) => TransportAuthKind::QueryParam,
}
}
fn websocket_query_param_name(&self) -> Option<&str> {
match &self.websocket {
WebSocketAuth::QueryParam(key) => Some(key.name()),
_ => None,
}
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct TransportConfig {
base_url: String,
auth: AuthConfig,
}
impl TransportConfig {
pub fn new(base_url: impl Into<String>) -> Self {
Self {
base_url: base_url.into(),
auth: AuthConfig::default(),
}
}
pub fn from_workflow<E: Environment>(
workflow: &ResolvedWorkflow,
env: &E,
) -> Result<Self, OpenHandsError> {
let transport = &workflow.extensions.openhands.transport;
let websocket = &workflow.extensions.openhands.websocket;
let auth = build_workflow_auth_config(
transport.session_api_key_env.as_deref(),
websocket.auth_mode.as_str(),
websocket.query_param_name.as_str(),
env,
)?;
let config = Self::new(transport.base_url.clone()).with_auth(auth);
config.parsed_base_url()?;
Ok(config)
}
pub fn with_auth(mut self, auth: AuthConfig) -> Self {
self.auth = auth;
self
}
pub fn base_url(&self) -> &str {
&self.base_url
}
pub fn auth(&self) -> &AuthConfig {
&self.auth
}
pub fn diagnostics(&self) -> Result<TransportDiagnostics, OpenHandsError> {
let url = self.parsed_base_url()?;
let target_kind = if is_loopback_host(url.host()) {
TransportTargetKind::Loopback
} else {
TransportTargetKind::Remote
};
Ok(TransportDiagnostics {
target_kind,
http_auth_kind: self.auth.http_auth_kind(),
websocket_auth_kind: self.auth.websocket_auth_kind(),
websocket_query_param_name: self
.auth
.websocket_query_param_name()
.map(ToOwned::to_owned),
managed_local_server_candidate: self.managed_local_server_base_url()?.is_some(),
})
}
pub fn managed_local_server_base_url(&self) -> Result<Option<String>, OpenHandsError> {
let url = self.parsed_base_url()?;
if !is_loopback_host(url.host()) || url.scheme() != "http" {
return Ok(None);
}
if self.auth.http_auth_kind() != TransportAuthKind::None
|| self.auth.websocket_auth_kind() != TransportAuthKind::None
{
return Ok(None);
}
Ok(Some(url[..Position::BeforePath].to_string()))
}
fn endpoint(&self, suffix: &str) -> Result<Url, OpenHandsError> {
let mut url = self.parsed_base_url()?;
let base_path = url.path().trim_end_matches('/');
let path = format!("{base_path}{suffix}");
let normalized = if path.is_empty() {
"/".to_string()
} else {
path
};
url.set_path(&normalized);
self.auth.apply_http_query(&mut url);
Ok(url)
}
fn websocket_request(
&self,
conversation_id: Uuid,
resend_all: bool,
) -> Result<tokio_tungstenite::tungstenite::http::Request<()>, OpenHandsError> {
let mut url = self.parsed_base_url()?;
let scheme = match url.scheme() {
"http" => "ws",
"https" => "wss",
other => {
return Err(OpenHandsError::invalid_configuration(format!(
"unsupported base URL scheme `{other}`"
)));
}
};
url.set_scheme(scheme).map_err(|_| {
OpenHandsError::invalid_configuration(format!(
"failed to apply websocket scheme `{scheme}`"
))
})?;
let base_path = url.path().trim_end_matches('/');
let path = if base_path.is_empty() {
format!("/sockets/events/{conversation_id}")
} else {
format!("{base_path}/sockets/events/{conversation_id}")
};
url.set_path(&path);
self.auth.apply_websocket_query(&mut url);
if resend_all {
url.query_pairs_mut().append_pair("resend_all", "true");
}
let mut request = url.as_str().into_client_request().map_err(|error| {
OpenHandsError::invalid_configuration(format!(
"invalid websocket request `{url}`: {error}"
))
})?;
self.auth.apply_websocket_headers(request.headers_mut())?;
Ok(request)
}
fn apply_http_auth(&self, request: RequestBuilder) -> Result<RequestBuilder, OpenHandsError> {
self.auth.apply_http_headers(request)
}
fn parsed_base_url(&self) -> Result<Url, OpenHandsError> {
let url = Url::parse(&self.base_url).map_err(|error| {
OpenHandsError::invalid_configuration(format!(
"invalid base URL `{}`: {error}",
self.base_url
))
})?;
match url.scheme() {
"http" | "https" => {}
other => {
return Err(OpenHandsError::invalid_configuration(format!(
"unsupported base URL scheme `{other}`"
)));
}
}
if url.host().is_none() {
return Err(OpenHandsError::invalid_configuration(format!(
"base URL `{}` must include a host",
self.base_url
)));
}
if !url.username().is_empty() || url.password().is_some() {
return Err(OpenHandsError::invalid_configuration(format!(
"base URL `{}` must not embed credentials",
self.base_url
)));
}
if url.query().is_some() || url.fragment().is_some() {
return Err(OpenHandsError::invalid_configuration(format!(
"base URL `{}` must not include query or fragment suffixes",
self.base_url
)));
}
Ok(url)
}
}
#[derive(Debug, thiserror::Error)]
pub enum OpenHandsError {
#[error("invalid transport configuration: {detail}")]
InvalidConfiguration { detail: String },
#[error("{operation} transport failed: {detail}")]
Transport {
operation: &'static str,
detail: String,
},
#[error("{operation} returned HTTP {status_code}: {body}")]
HttpStatus {
operation: &'static str,
status_code: u16,
body: String,
},
#[error("{operation} protocol error: {detail}")]
Protocol {
operation: &'static str,
detail: String,
},
#[error("{operation} websocket failed: {detail}")]
WebSocketTransport {
operation: &'static str,
detail: String,
},
#[error("websocket event decoding failed: {detail}; payload prefix: {snippet}")]
MalformedWebSocketEvent { detail: String, snippet: String },
#[error("websocket readiness timed out after {0:?}")]
ReadinessTimeout(Duration),
#[error("probe run activity was not observed after {0:?}")]
ProbeActivityTimeout(Duration),
#[error("probe run reported an unhealthy runtime: {0}")]
ProbeRunUnhealthy(String),
#[error("websocket closed before readiness")]
WebSocketClosed,
#[error("runtime stream reconnect exhausted after {attempts} attempt(s): {last_error}")]
ReconnectExhausted { attempts: usize, last_error: String },
}
impl OpenHandsError {
fn invalid_configuration(detail: impl Into<String>) -> Self {
Self::InvalidConfiguration {
detail: detail.into(),
}
}
fn transport(operation: &'static str, error: impl std::fmt::Display) -> Self {
Self::Transport {
operation,
detail: error.to_string(),
}
}
fn protocol(operation: &'static str, error: impl std::fmt::Display) -> Self {
Self::Protocol {
operation,
detail: error.to_string(),
}
}
fn websocket_transport(operation: &'static str, error: impl std::fmt::Display) -> Self {
Self::WebSocketTransport {
operation,
detail: error.to_string(),
}
}
}
type RuntimeSocket = WebSocketStream<MaybeTlsStream<TcpStream>>;
const CONVERSATION_STATE_UPDATE_EVENT_KIND: &str = "ConversationStateUpdateEvent";
const UNREADY_EVENT_ID: &str = "runtime-stream-unready";
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct RuntimeStreamConfig {
pub readiness_timeout: Duration,
pub reconnect_initial_backoff: Duration,
pub reconnect_max_backoff: Duration,
pub max_reconnect_attempts: usize,
pub replay_existing_events_on_attach: bool,
}
impl Default for RuntimeStreamConfig {
fn default() -> Self {
Self {
readiness_timeout: Duration::from_secs(30),
reconnect_initial_backoff: Duration::from_secs(1),
reconnect_max_backoff: Duration::from_secs(30),
max_reconnect_attempts: 8,
replay_existing_events_on_attach: false,
}
}
}
pub struct RuntimeEventStream {
client: OpenHandsClient,
conversation_id: Uuid,
config: RuntimeStreamConfig,
socket: Option<RuntimeSocket>,
conversation: Conversation,
ready_event: EventEnvelope,
event_cache: EventCache,
state_mirror: ConversationStateMirror,
pending_events: VecDeque<EventEnvelope>,
pending_delivery_needs_drain: bool,
reconnect_pending: bool,
}
impl RuntimeEventStream {
fn new(
client: OpenHandsClient,
conversation_id: Uuid,
config: RuntimeStreamConfig,
conversation: Conversation,
) -> Self {
let ready_event = EventEnvelope::state_update(UNREADY_EVENT_ID, "idle");
let mut state_mirror = ConversationStateMirror::default();
state_mirror.apply_conversation(&conversation);
Self {
client,
conversation_id,
config,
socket: None,
conversation,
ready_event,
event_cache: EventCache::new(),
state_mirror,
pending_events: VecDeque::new(),
pending_delivery_needs_drain: false,
reconnect_pending: false,
}
}
pub fn conversation(&self) -> &Conversation {
&self.conversation
}
pub fn ready_event(&self) -> &EventEnvelope {
&self.ready_event
}
pub fn event_cache(&self) -> &EventCache {
&self.event_cache
}
pub fn state_mirror(&self) -> &ConversationStateMirror {
&self.state_mirror
}
pub async fn reconcile_events(&mut self) -> Result<usize, OpenHandsError> {
let reconciled = self.client.search_all_events(self.conversation_id).await?;
Ok(self.push_new_events(reconciled.items().iter().cloned(), true))
}
pub async fn reconcile_recent_events(&mut self, limit: usize) -> Result<usize, OpenHandsError> {
let reconciled = self
.client
.search_recent_events(self.conversation_id, limit)
.await?;
Ok(self.push_new_events(reconciled.items().iter().cloned(), true))
}
pub async fn next_event(&mut self) -> Result<Option<EventEnvelope>, OpenHandsError> {
loop {
if let Some(event) = self.poll_next_event_once().await? {
return Ok(Some(event));
}
if self.socket.is_none() && self.pending_events.is_empty() && !self.reconnect_pending {
return Ok(None);
}
}
}
async fn poll_next_event_once(&mut self) -> Result<Option<EventEnvelope>, OpenHandsError> {
self.absorb_buffered_socket_events().await?;
if self.defer_pending_event_delivery_once().await? {
return Ok(None);
}
if let Some(event) = self.pending_events.pop_front() {
return Ok(Some(event));
}
if self.reconnect_pending {
self.reconnect_pending = false;
self.reconnect().await?;
if self.defer_pending_event_delivery_once().await? {
return Ok(None);
}
if let Some(event) = self.pending_events.pop_front() {
return Ok(Some(event));
}
}
let stream_read = {
let Some(socket) = self.socket.as_mut() else {
return Ok(None);
};
read_next_socket_event(socket).await
};
match stream_read {
StreamRead::Event(event) => {
let mut drained_events = vec![event];
let reconnect_signal = self.drain_buffered_socket_events(&mut drained_events).await;
self.push_new_events(drained_events, true);
self.handle_reconnect_signal(reconnect_signal).await?;
Ok(None)
}
StreamRead::Closed => {
self.handle_reconnect_signal(Some(StreamRead::Closed))
.await?;
if self.defer_pending_event_delivery_once().await? {
return Ok(None);
}
Ok(self.pending_events.pop_front())
}
StreamRead::Transport(error) => {
self.handle_reconnect_signal(Some(StreamRead::Transport(error)))
.await?;
if self.defer_pending_event_delivery_once().await? {
return Ok(None);
}
Ok(self.pending_events.pop_front())
}
}
}
async fn absorb_buffered_socket_events(&mut self) -> Result<(), OpenHandsError> {
if self.socket.is_none() {
return Ok(());
}
let mut drained_events = Vec::new();
let reconnect_signal = self.drain_buffered_socket_events(&mut drained_events).await;
self.push_new_events(drained_events, true);
self.handle_reconnect_signal(reconnect_signal).await
}
async fn defer_pending_event_delivery_once(&mut self) -> Result<bool, OpenHandsError> {
if !self.pending_delivery_needs_drain || self.pending_events.is_empty() {
return Ok(false);
}
self.pending_delivery_needs_drain = false;
yield_now().await;
self.absorb_buffered_socket_events().await?;
Ok(true)
}
async fn drain_buffered_socket_events(
&mut self,
drained_events: &mut Vec<EventEnvelope>,
) -> Option<StreamRead> {
loop {
let next = {
let socket = self
.socket
.as_mut()
.expect("socket should be present while draining buffered events");
read_buffered_socket_event(socket).await
};
match next {
Some(StreamRead::Event(event)) => drained_events.push(event),
Some(StreamRead::Closed) => return Some(StreamRead::Closed),
Some(StreamRead::Transport(error)) => {
return Some(StreamRead::Transport(error));
}
None => return None,
}
}
}
async fn handle_reconnect_signal(
&mut self,
reconnect_signal: Option<StreamRead>,
) -> Result<(), OpenHandsError> {
match reconnect_signal {
Some(StreamRead::Closed) => {
self.socket.take();
if self.pending_events.is_empty() {
self.reconnect().await?;
} else {
self.reconnect_pending = true;
}
}
Some(StreamRead::Transport(error)) => {
debug!(
error = %error,
"runtime websocket read failed while draining buffered events; attempting reconnect"
);
self.socket.take();
if self.pending_events.is_empty() {
self.reconnect().await?;
} else {
self.reconnect_pending = true;
}
}
Some(StreamRead::Event(_)) => {
unreachable!("buffered socket draining should not return nested stream events")
}
None => {}
}
Ok(())
}
pub async fn close(&mut self) -> Result<(), OpenHandsError> {
self.clear_ready_event();
self.pending_delivery_needs_drain = false;
self.reconnect_pending = false;
self.pending_events.clear();
if let Some(mut socket) = self.socket.take() {
socket.close(None).await.map_err(|error| {
OpenHandsError::websocket_transport("close runtime stream", error)
})?;
}
Ok(())
}
async fn attach(mut self) -> Result<Self, OpenHandsError> {
self.refresh_conversation().await?;
self.connect_ready_and_reconcile().await?;
Ok(self)
}
async fn attach_with_recent_events(mut self, limit: usize) -> Result<Self, OpenHandsError> {
self.refresh_conversation().await?;
self.connect_ready_and_reconcile_recent(limit).await?;
Ok(self)
}
async fn refresh_conversation(&mut self) -> Result<(), OpenHandsError> {
self.conversation = self.client.get_conversation(self.conversation_id).await?;
self.rebuild_state_mirror();
Ok(())
}
async fn reconnect(&mut self) -> Result<(), OpenHandsError> {
self.clear_ready_event();
let mut attempts = 0usize;
let mut delay = self.config.reconnect_initial_backoff;
loop {
attempts += 1;
if attempts > 1 {
sleep(delay).await;
delay = delay
.saturating_mul(2)
.min(self.config.reconnect_max_backoff);
}
let error = match self.refresh_conversation().await {
Ok(()) => match self.connect_ready_and_reconcile().await {
Ok(()) => return Ok(()),
Err(error) => error,
},
Err(error) => error,
};
if attempts >= self.config.max_reconnect_attempts {
return Err(OpenHandsError::ReconnectExhausted {
attempts,
last_error: error.to_string(),
});
}
}
}
async fn connect_ready_and_reconcile(&mut self) -> Result<(), OpenHandsError> {
let mut socket = self
.client
.connect_websocket(
self.conversation_id,
self.config.replay_existing_events_on_attach,
)
.await?;
let ready_event =
wait_for_readiness_on_stream(&mut socket, self.config.readiness_timeout).await?;
self.ready_event = ready_event.clone();
self.socket = Some(socket);
let reconciled = self.client.search_all_events(self.conversation_id).await?;
self.push_new_events(reconciled.items().iter().cloned(), true);
self.rebuild_state_mirror();
Ok(())
}
async fn connect_ready_and_reconcile_recent(
&mut self,
limit: usize,
) -> Result<(), OpenHandsError> {
let mut socket = self
.client
.connect_websocket(
self.conversation_id,
self.config.replay_existing_events_on_attach,
)
.await?;
let ready_event =
wait_for_readiness_on_stream(&mut socket, self.config.readiness_timeout).await?;
self.ready_event = ready_event.clone();
self.socket = Some(socket);
let reconciled = self
.client
.search_recent_events(self.conversation_id, limit)
.await?;
self.push_new_events(reconciled.items().iter().cloned(), true);
self.rebuild_state_mirror();
Ok(())
}
fn push_new_events<I>(&mut self, events: I, queue_new: bool) -> usize
where
I: IntoIterator<Item = EventEnvelope>,
{
let inserted = self.event_cache.merge_new_events(events);
if inserted.is_empty() {
return 0;
}
self.pending_delivery_needs_drain = true;
if queue_new {
self.queue_pending_events(&inserted);
}
if inserted.iter().any(|event| {
matches!(
KnownEvent::from_envelope(event),
KnownEvent::ConversationStateUpdate(_)
)
}) {
self.rebuild_state_mirror();
}
inserted.len()
}
fn queue_pending_events(&mut self, inserted: &[EventEnvelope]) {
for event in inserted {
let position = self
.pending_events
.iter()
.position(|pending| compare_pending_events(pending, event) == Ordering::Greater)
.unwrap_or(self.pending_events.len());
self.pending_events.insert(position, event.clone());
}
}
fn rebuild_state_mirror(&mut self) {
self.state_mirror
.rebuild_from(&self.conversation, self.event_cache.items());
self.apply_terminal_conversation_fallback();
self.apply_ready_event_to_state_mirror();
}
fn clear_ready_event(&mut self) {
self.ready_event = EventEnvelope::state_update(UNREADY_EVENT_ID, "idle");
}
fn apply_ready_event_to_state_mirror(&mut self) {
if self.ready_event.id == UNREADY_EVENT_ID
|| self.ready_event.kind != CONVERSATION_STATE_UPDATE_EVENT_KIND
{
return;
}
let KnownEvent::ConversationStateUpdate(payload) =
KnownEvent::from_envelope(&self.ready_event)
else {
return;
};
let cache_already_has_same_or_newer_state = self.event_cache.items().iter().any(|event| {
compare_pending_events(event, &self.ready_event) != Ordering::Less
&& matches!(
KnownEvent::from_envelope(event),
KnownEvent::ConversationStateUpdate(_)
)
});
if cache_already_has_same_or_newer_state {
return;
}
let ready_event_is_terminal = matches!(
payload.execution_status.as_deref(),
Some("finished" | "error" | "stuck")
);
let ready_event_restarts_execution = matches!(
payload.execution_status.as_deref(),
Some("queued" | "running")
);
if self.state_mirror.terminal_status().is_some()
&& !ready_event_is_terminal
&& !ready_event_restarts_execution
{
return;
}
self.state_mirror.apply_event(&self.ready_event);
}
fn apply_terminal_conversation_fallback(&mut self) {
let latest_cached_execution_status = self.event_cache.items().iter().rev().find_map(
|event| match KnownEvent::from_envelope(event) {
KnownEvent::ConversationStateUpdate(payload) => payload.execution_status,
_ => None,
},
);
let cached_state_restarts_execution = matches!(
latest_cached_execution_status.as_deref(),
Some("queued" | "running")
);
if matches!(
self.conversation.execution_status.as_str(),
"finished" | "error" | "stuck"
) && !cached_state_restarts_execution
&& self.state_mirror.terminal_status().is_none()
{
self.state_mirror
.apply_conversation_execution_status(&self.conversation);
}
}
}
fn compare_pending_events(left: &EventEnvelope, right: &EventEnvelope) -> Ordering {
left.timestamp
.cmp(&right.timestamp)
.then_with(|| left.id.cmp(&right.id))
}
#[derive(Debug)]
enum StreamRead {
Event(EventEnvelope),
Closed,
Transport(OpenHandsError),
}
#[derive(Debug, Clone)]
pub struct OpenHandsProbeResult {
pub conversation: Conversation,
pub ready_event: EventEnvelope,
pub event_cache: EventCache,
pub state_mirror: ConversationStateMirror,
}
#[derive(Clone)]
pub struct OpenHandsClient {
http: reqwest::Client,
transport: TransportConfig,
}
impl OpenHandsClient {
pub fn new(transport: TransportConfig) -> Self {
Self {
http: reqwest::Client::new(),
transport,
}
}
pub fn base_url(&self) -> &str {
self.transport.base_url()
}
pub fn transport_diagnostics(&self) -> Result<TransportDiagnostics, OpenHandsError> {
self.transport.diagnostics()
}
pub async fn openapi_probe(&self) -> Result<(), OpenHandsError> {
let response = send(self.get_request("/openapi.json")?, "probe OpenAPI").await?;
read_success_body(response, "probe OpenAPI")
.await
.map(|_| ())
}
pub async fn create_conversation(
&self,
request: &ConversationCreateRequest,
) -> Result<Conversation, OpenHandsError> {
let response = send(
self.json_request(
self.post_request("/api/conversations")?,
"create conversation",
request,
)?,
"create conversation",
)
.await?;
decode_json(response, "create conversation").await
}
pub async fn get_conversation(
&self,
conversation_id: Uuid,
) -> Result<Conversation, OpenHandsError> {
let response = send(
self.get_request(&format!("/api/conversations/{conversation_id}"))?,
"fetch conversation",
)
.await?;
decode_json(response, "fetch conversation").await
}
pub async fn delete_conversation(&self, conversation_id: Uuid) -> Result<(), OpenHandsError> {
let response = send(
self.delete_request(&format!("/api/conversations/{conversation_id}"))?,
"delete conversation",
)
.await?;
read_success_body(response, "delete conversation").await?;
Ok(())
}
pub async fn send_message(
&self,
conversation_id: Uuid,
request: &SendMessageRequest,
) -> Result<AcceptedResponse, OpenHandsError> {
let response = send(
self.json_request(
self.post_request(&format!("/api/conversations/{conversation_id}/events"))?,
"send conversation event",
request,
)?,
"send conversation event",
)
.await?;
decode_accepted(response, "send conversation event").await
}
pub async fn run_conversation(
&self,
conversation_id: Uuid,
) -> Result<AcceptedResponse, OpenHandsError> {
let response = send(
self.json_request(
self.post_request(&format!("/api/conversations/{conversation_id}/run"))?,
"trigger conversation run",
&ConversationRunRequest::default(),
)?,
"trigger conversation run",
)
.await?;
decode_accepted(response, "trigger conversation run").await
}
pub async fn search_events_page(
&self,
conversation_id: Uuid,
page_id: Option<&str>,
) -> Result<SearchConversationEventsResponse, OpenHandsError> {
self.search_events_page_with_options(conversation_id, page_id, None, None)
.await
}
async fn search_events_page_with_options(
&self,
conversation_id: Uuid,
page_id: Option<&str>,
limit: Option<usize>,
sort_order: Option<&str>,
) -> Result<SearchConversationEventsResponse, OpenHandsError> {
let mut url = self.transport.endpoint(&format!(
"/api/conversations/{conversation_id}/events/search"
))?;
if let Some(page_id) = page_id {
url.query_pairs_mut().append_pair("page_id", page_id);
}
if let Some(limit) = limit {
url.query_pairs_mut()
.append_pair("limit", &limit.max(1).to_string());
}
if let Some(sort_order) = sort_order {
url.query_pairs_mut().append_pair("sort_order", sort_order);
}
let response = send(
self.transport.apply_http_auth(self.http.get(url))?,
"search conversation events",
)
.await?;
decode_json(response, "search conversation events").await
}
pub async fn search_all_events(
&self,
conversation_id: Uuid,
) -> Result<EventCache, OpenHandsError> {
let mut page_id: Option<String> = None;
let mut cache = EventCache::new();
loop {
let page = self
.search_events_page(conversation_id, page_id.as_deref())
.await?;
cache.extend(page.events);
match page.next_page_id {
Some(next_page_id) => page_id = Some(next_page_id),
None => return Ok(cache),
}
}
}
pub async fn search_recent_events(
&self,
conversation_id: Uuid,
limit: usize,
) -> Result<EventCache, OpenHandsError> {
let page = self
.search_events_page_with_options(
conversation_id,
None,
Some(limit),
Some("TIMESTAMP_DESC"),
)
.await?;
let mut cache = EventCache::new();
cache.extend(page.events);
Ok(cache)
}
pub async fn attach_runtime_stream(
&self,
conversation_id: Uuid,
config: RuntimeStreamConfig,
) -> Result<RuntimeEventStream, OpenHandsError> {
let conversation = self.get_conversation(conversation_id).await?;
RuntimeEventStream::new(self.clone(), conversation_id, config, conversation)
.attach()
.await
}
pub async fn attach_runtime_stream_with_recent_events(
&self,
conversation_id: Uuid,
config: RuntimeStreamConfig,
recent_limit: usize,
) -> Result<RuntimeEventStream, OpenHandsError> {
let conversation = self.get_conversation(conversation_id).await?;
RuntimeEventStream::new(self.clone(), conversation_id, config, conversation)
.attach_with_recent_events(recent_limit)
.await
}
pub async fn wait_for_readiness(
&self,
conversation_id: Uuid,
wait_timeout: Duration,
) -> Result<EventEnvelope, OpenHandsError> {
let mut stream = self.connect_websocket(conversation_id, false).await?;
wait_for_readiness_on_stream(&mut stream, wait_timeout).await
}
pub async fn run_probe(
&self,
request: &ConversationCreateRequest,
wait_timeout: Duration,
) -> Result<OpenHandsProbeResult, OpenHandsError> {
self.run_probe_with_message(
request,
"Reply with the exact text `OpenSymphony doctor probe OK` and then finish.",
wait_timeout,
)
.await
}
pub async fn run_probe_with_message(
&self,
request: &ConversationCreateRequest,
prompt: &str,
wait_timeout: Duration,
) -> Result<OpenHandsProbeResult, OpenHandsError> {
let conversation = self.create_conversation(request).await?;
let mut stream = self
.attach_runtime_stream(
conversation.conversation_id,
RuntimeStreamConfig {
readiness_timeout: wait_timeout,
reconnect_initial_backoff: Duration::from_millis(100),
reconnect_max_backoff: Duration::from_secs(1),
max_reconnect_attempts: 4,
replay_existing_events_on_attach: false,
},
)
.await?;
self.send_message(
conversation.conversation_id,
&SendMessageRequest::user_text(prompt),
)
.await?;
self.run_conversation(conversation.conversation_id).await?;
wait_for_probe_terminal_state(&mut stream, wait_timeout).await?;
let ready_event = stream.ready_event().clone();
let event_cache = stream.event_cache().clone();
let state_mirror = stream.state_mirror().clone();
let mut conversation = stream.conversation().clone();
if let Some(status) = state_mirror.execution_status() {
conversation.execution_status = status.to_string();
}
stream.close().await?;
Ok(OpenHandsProbeResult {
conversation,
ready_event,
event_cache,
state_mirror,
})
}
fn get_request(&self, suffix: &str) -> Result<RequestBuilder, OpenHandsError> {
let url = self.transport.endpoint(suffix)?;
self.transport.apply_http_auth(self.http.get(url))
}
fn post_request(&self, suffix: &str) -> Result<RequestBuilder, OpenHandsError> {
let url = self.transport.endpoint(suffix)?;
self.transport.apply_http_auth(self.http.post(url))
}
fn delete_request(&self, suffix: &str) -> Result<RequestBuilder, OpenHandsError> {
let url = self.transport.endpoint(suffix)?;
self.transport.apply_http_auth(self.http.delete(url))
}
fn json_request<T>(
&self,
request: RequestBuilder,
operation: &'static str,
payload: &T,
) -> Result<RequestBuilder, OpenHandsError>
where
T: Serialize,
{
let body = serde_json::to_vec(payload)
.map_err(|error| OpenHandsError::protocol(operation, error))?;
Ok(request.header(CONTENT_TYPE, "application/json").body(body))
}
async fn connect_websocket(
&self,
conversation_id: Uuid,
resend_all: bool,
) -> Result<RuntimeSocket, OpenHandsError> {
let ws_request = self
.transport
.websocket_request(conversation_id, resend_all)?;
let (stream, _) = connect_async(ws_request).await.map_err(|error| {
OpenHandsError::websocket_transport("connect runtime stream", error)
})?;
Ok(stream)
}
}
fn build_workflow_auth_config<E: Environment>(
session_api_key_env: Option<&str>,
websocket_auth_mode: &str,
websocket_query_param_name: &str,
env: &E,
) -> Result<AuthConfig, OpenHandsError> {
let websocket_auth_mode = WorkflowWebSocketAuthMode::parse(websocket_auth_mode)?;
let Some(session_api_key_env) = session_api_key_env else {
return match websocket_auth_mode {
WorkflowWebSocketAuthMode::Auto => Ok(AuthConfig::none()),
WorkflowWebSocketAuthMode::Header | WorkflowWebSocketAuthMode::QueryParam => {
Err(OpenHandsError::invalid_configuration(format!(
"websocket auth mode `{}` requires a configured session API key env",
websocket_auth_mode_label(websocket_auth_mode)
)))
}
};
};
let session_api_key = env
.get(session_api_key_env)
.map(|value| value.trim().to_owned())
.filter(|value| !value.is_empty())
.ok_or_else(|| {
OpenHandsError::invalid_configuration(format!(
"session API key env `{session_api_key_env}` is not set or is blank"
))
})?;
let http = HttpAuth::Header(ApiKeyAuth::new(
SESSION_API_KEY_HEADER_NAME,
session_api_key.clone(),
));
let websocket = match websocket_auth_mode {
WorkflowWebSocketAuthMode::Auto | WorkflowWebSocketAuthMode::QueryParam => {
WebSocketAuth::QueryParam(ApiKeyAuth::new(websocket_query_param_name, session_api_key))
}
WorkflowWebSocketAuthMode::Header => WebSocketAuth::Header(ApiKeyAuth::new(
SESSION_API_KEY_HEADER_NAME,
session_api_key,
)),
};
Ok(AuthConfig { http, websocket })
}
fn websocket_auth_mode_label(mode: WorkflowWebSocketAuthMode) -> &'static str {
match mode {
WorkflowWebSocketAuthMode::Auto => "auto",
WorkflowWebSocketAuthMode::Header => "header",
WorkflowWebSocketAuthMode::QueryParam => "query_param",
}
}
fn is_loopback_host(host: Option<Host<&str>>) -> bool {
match host {
Some(Host::Ipv4(address)) => address.is_loopback(),
Some(Host::Ipv6(address)) => address.is_loopback(),
Some(Host::Domain(domain)) => domain.eq_ignore_ascii_case("localhost"),
None => false,
}
}
async fn send(
request: RequestBuilder,
operation: &'static str,
) -> Result<reqwest::Response, OpenHandsError> {
request
.send()
.await
.map_err(|error| OpenHandsError::transport(operation, error))
}
async fn read_success_body(
response: reqwest::Response,
operation: &'static str,
) -> Result<Option<Value>, OpenHandsError> {
let status = response.status();
let body = response
.text()
.await
.map_err(|error| OpenHandsError::transport(operation, error))?;
if !status.is_success() {
return Err(OpenHandsError::HttpStatus {
operation,
status_code: status.as_u16(),
body,
});
}
if body.trim().is_empty() {
return Ok(None);
}
serde_json::from_str(&body)
.map(Some)
.map_err(|error| OpenHandsError::protocol(operation, error))
}
async fn decode_json<T>(
response: reqwest::Response,
operation: &'static str,
) -> Result<T, OpenHandsError>
where
T: DeserializeOwned,
{
let value = read_success_body(response, operation)
.await?
.ok_or_else(|| OpenHandsError::protocol(operation, "expected JSON response body"))?;
serde_json::from_value(value).map_err(|error| OpenHandsError::protocol(operation, error))
}
async fn decode_accepted(
response: reqwest::Response,
operation: &'static str,
) -> Result<AcceptedResponse, OpenHandsError> {
let Some(value) = read_success_body(response, operation).await? else {
return Ok(AcceptedResponse::accepted());
};
let accepted: AcceptedResponse = serde_json::from_value(value)
.map_err(|error| OpenHandsError::protocol(operation, error))?;
if accepted.success {
Ok(accepted)
} else {
Err(OpenHandsError::protocol(
operation,
"response reported `success=false`",
))
}
}
fn parse_text_event(payload: &str) -> Result<EventEnvelope, OpenHandsError> {
serde_json::from_str(payload).map_err(|error| OpenHandsError::MalformedWebSocketEvent {
detail: error.to_string(),
snippet: payload.chars().take(160).collect(),
})
}
fn parse_binary_event(payload: &[u8]) -> Result<EventEnvelope, OpenHandsError> {
serde_json::from_slice(payload).map_err(|error| OpenHandsError::MalformedWebSocketEvent {
detail: error.to_string(),
snippet: String::from_utf8_lossy(payload).chars().take(160).collect(),
})
}
fn parse_header_name(name: &str) -> Result<HeaderName, OpenHandsError> {
HeaderName::from_bytes(name.as_bytes()).map_err(|error| {
OpenHandsError::invalid_configuration(format!("invalid auth header name `{name}`: {error}"))
})
}
fn parse_header_value(value: &str) -> Result<HeaderValue, OpenHandsError> {
HeaderValue::from_str(value).map_err(|error| {
OpenHandsError::invalid_configuration(format!("invalid auth header value: {error}"))
})
}
async fn wait_for_readiness_on_stream(
stream: &mut RuntimeSocket,
wait_timeout: Duration,
) -> Result<EventEnvelope, OpenHandsError> {
let deadline = Instant::now() + wait_timeout;
loop {
let next_message = timeout_at(deadline, stream.next())
.await
.map_err(|_| OpenHandsError::ReadinessTimeout(wait_timeout))?;
match next_message {
Some(Ok(Message::Text(payload))) => match parse_text_event(&payload) {
Ok(event) if event.kind == CONVERSATION_STATE_UPDATE_EVENT_KIND => {
return Ok(event);
}
Ok(event) => {
debug!(event_kind = %event.kind, "ignoring non-readiness websocket event");
}
Err(error) => {
debug!(error = %error, "ignoring undecodable websocket text frame before readiness");
}
},
Some(Ok(Message::Binary(payload))) => match parse_binary_event(&payload) {
Ok(event) if event.kind == CONVERSATION_STATE_UPDATE_EVENT_KIND => {
return Ok(event);
}
Ok(event) => {
debug!(event_kind = %event.kind, "ignoring non-readiness websocket event");
}
Err(error) => {
debug!(error = %error, "ignoring undecodable websocket binary frame before readiness");
}
},
Some(Ok(Message::Ping(_))) | Some(Ok(Message::Pong(_))) => {
debug!("ignoring websocket control frame before readiness");
}
Some(Ok(Message::Frame(_))) => {
debug!("ignoring raw websocket frame before readiness");
}
Some(Ok(Message::Close(_))) | None => return Err(OpenHandsError::WebSocketClosed),
Some(Err(error)) => {
return Err(OpenHandsError::websocket_transport(
"wait for readiness",
error,
));
}
}
}
}
async fn read_next_socket_event(stream: &mut RuntimeSocket) -> StreamRead {
loop {
match stream.next().await {
Some(Ok(Message::Text(payload))) => match parse_text_event(&payload) {
Ok(event) => {
debug!(event_kind = %event.kind, event_id = %event.id, source = %event.source, "received websocket event");
return StreamRead::Event(event);
}
Err(error) => {
debug!(error = %error, "ignoring undecodable websocket text frame during streaming");
}
},
Some(Ok(Message::Binary(payload))) => match parse_binary_event(&payload) {
Ok(event) => {
debug!(event_kind = %event.kind, event_id = %event.id, source = %event.source, "received websocket event");
return StreamRead::Event(event);
}
Err(error) => {
debug!(error = %error, "ignoring undecodable websocket binary frame during streaming");
}
},
Some(Ok(Message::Ping(_))) | Some(Ok(Message::Pong(_))) => {
debug!("ignoring websocket control frame during streaming");
}
Some(Ok(Message::Frame(_))) => {
debug!("ignoring raw websocket frame during streaming");
}
Some(Ok(Message::Close(_))) | None => return StreamRead::Closed,
Some(Err(error)) => {
return StreamRead::Transport(OpenHandsError::websocket_transport(
"read runtime event",
error,
));
}
}
}
}
async fn read_buffered_socket_event(stream: &mut RuntimeSocket) -> Option<StreamRead> {
loop {
let next_message = tokio::select! {
biased;
message = stream.next() => Some(message),
() = yield_now() => None,
};
match next_message {
None => return None,
Some(Some(Ok(Message::Text(payload)))) => match parse_text_event(&payload) {
Ok(event) => return Some(StreamRead::Event(event)),
Err(error) => {
debug!(
error = %error,
"ignoring undecodable websocket text frame while draining buffered events"
);
}
},
Some(Some(Ok(Message::Binary(payload)))) => match parse_binary_event(&payload) {
Ok(event) => return Some(StreamRead::Event(event)),
Err(error) => {
debug!(
error = %error,
"ignoring undecodable websocket binary frame while draining buffered events"
);
}
},
Some(Some(Ok(Message::Ping(_)))) | Some(Some(Ok(Message::Pong(_)))) => {
debug!("ignoring websocket control frame while draining buffered events");
}
Some(Some(Ok(Message::Frame(_)))) => {
debug!("ignoring raw websocket frame while draining buffered events");
}
Some(Some(Ok(Message::Close(_)))) | Some(None) => return Some(StreamRead::Closed),
Some(Some(Err(error))) => {
return Some(StreamRead::Transport(OpenHandsError::websocket_transport(
"read runtime event",
error,
)));
}
}
}
}
async fn wait_for_probe_terminal_state(
stream: &mut RuntimeEventStream,
wait_timeout: Duration,
) -> Result<(), OpenHandsError> {
let deadline = Instant::now() + wait_timeout;
loop {
if let Some(event) = stream.pending_conversation_error_event() {
return Err(OpenHandsError::ProbeRunUnhealthy(format!(
"received {} {} before a successful terminal status",
event.kind, event.id
)));
}
match stream.state_mirror().terminal_status() {
Some(TerminalExecutionStatus::Finished) => {
if confirm_finished_probe_terminal_state(stream).await? {
return Ok(());
}
}
Some(TerminalExecutionStatus::Error) | Some(TerminalExecutionStatus::Stuck) => {
return Err(OpenHandsError::ProbeRunUnhealthy(format!(
"terminal execution_status `{}`",
stream.state_mirror().execution_status().unwrap_or_default()
)));
}
None => {}
}
let next_event = timeout_at(deadline, stream.poll_next_event_once())
.await
.map_err(|_| OpenHandsError::ProbeActivityTimeout(wait_timeout))?;
let next_event = match next_event {
Ok(next_event) => next_event,
Err(error) => match stream.state_mirror().terminal_status() {
Some(TerminalExecutionStatus::Finished) => return Ok(()),
Some(TerminalExecutionStatus::Error) | Some(TerminalExecutionStatus::Stuck) => {
return Err(OpenHandsError::ProbeRunUnhealthy(format!(
"terminal execution_status `{}`",
stream.state_mirror().execution_status().unwrap_or_default()
)));
}
None => return Err(error),
},
};
let Some(event) = next_event else {
continue;
};
if matches!(
KnownEvent::from_envelope(&event),
KnownEvent::ConversationError(_)
) {
return Err(OpenHandsError::ProbeRunUnhealthy(format!(
"received {} {} before a successful terminal status",
event.kind, event.id
)));
}
}
}
async fn confirm_finished_probe_terminal_state(
stream: &mut RuntimeEventStream,
) -> Result<bool, OpenHandsError> {
yield_now().await;
if let Err(error) = stream.absorb_buffered_socket_events().await {
return match stream.state_mirror().terminal_status() {
Some(TerminalExecutionStatus::Finished) => Ok(true),
Some(TerminalExecutionStatus::Error) | Some(TerminalExecutionStatus::Stuck) => {
Err(OpenHandsError::ProbeRunUnhealthy(format!(
"terminal execution_status `{}`",
stream.state_mirror().execution_status().unwrap_or_default()
)))
}
None => Err(error),
};
}
if let Some(event) = stream.pending_conversation_error_event() {
return Err(OpenHandsError::ProbeRunUnhealthy(format!(
"received {} {} before a successful terminal status",
event.kind, event.id
)));
}
match stream.state_mirror().terminal_status() {
Some(TerminalExecutionStatus::Finished) => Ok(true),
Some(TerminalExecutionStatus::Error) | Some(TerminalExecutionStatus::Stuck) => {
Err(OpenHandsError::ProbeRunUnhealthy(format!(
"terminal execution_status `{}`",
stream.state_mirror().execution_status().unwrap_or_default()
)))
}
None => Ok(false),
}
}
impl RuntimeEventStream {
fn pending_conversation_error_event(&self) -> Option<&EventEnvelope> {
self.pending_events.iter().find(|event| {
matches!(
KnownEvent::from_envelope(event),
KnownEvent::ConversationError(_)
)
})
}
}