Skip to main content

iii_sdk/
iii.rs

1use std::{
2    collections::{HashMap, HashSet},
3    sync::{
4        Arc, Mutex, MutexGuard,
5        atomic::{AtomicBool, AtomicUsize, Ordering},
6    },
7    time::Duration,
8};
9
10/// Extension trait for Mutex that recovers from poisoning instead of panicking.
11/// This is safe when the protected data is still valid after a panic in another thread.
12trait MutexExt<T> {
13    fn lock_or_recover(&self) -> MutexGuard<'_, T>;
14}
15
16impl<T> MutexExt<T> for Mutex<T> {
17    fn lock_or_recover(&self) -> MutexGuard<'_, T> {
18        self.lock().unwrap_or_else(|e| e.into_inner())
19    }
20}
21
22use futures_util::{SinkExt, StreamExt};
23use serde::{Deserialize, Serialize};
24use serde_json::Value;
25use tokio::{
26    sync::{mpsc, oneshot},
27    time::sleep,
28};
29use tokio_tungstenite::{connect_async, tungstenite::Message as WsMessage};
30use uuid::Uuid;
31
32const SDK_VERSION: &str = env!("CARGO_PKG_VERSION");
33
34use crate::{
35    channels::{ChannelReader, ChannelWriter, StreamChannelRef},
36    error::IIIError,
37    protocol::{
38        ErrorBody, HttpInvocationConfig, Message, RegisterFunctionMessage, RegisterServiceMessage,
39        RegisterTriggerInput, RegisterTriggerMessage, RegisterTriggerTypeMessage, TriggerAction,
40        TriggerRequest, UnregisterTriggerMessage, UnregisterTriggerTypeMessage,
41    },
42    triggers::{Trigger, TriggerConfig, TriggerHandler},
43    types::{Channel, RemoteFunctionData, RemoteFunctionHandler, RemoteTriggerTypeData},
44};
45
46#[cfg(feature = "otel")]
47use crate::telemetry;
48#[cfg(feature = "otel")]
49use crate::telemetry::types::OtelConfig;
50
51const DEFAULT_TIMEOUT_MS: u64 = 30_000;
52
53/// Worker information returned by `engine::workers::list`
54#[derive(Debug, Clone, Serialize, Deserialize)]
55pub struct WorkerInfo {
56    pub id: String,
57    pub name: Option<String>,
58    pub runtime: Option<String>,
59    pub version: Option<String>,
60    pub os: Option<String>,
61    pub ip_address: Option<String>,
62    pub status: String,
63    pub connected_at_ms: u64,
64    pub function_count: usize,
65    pub functions: Vec<String>,
66    pub active_invocations: usize,
67}
68
69/// Function information returned by `engine::functions::list`
70#[derive(Debug, Clone, Serialize, Deserialize)]
71pub struct FunctionInfo {
72    pub function_id: String,
73    pub description: Option<String>,
74    pub request_format: Option<Value>,
75    pub response_format: Option<Value>,
76    pub metadata: Option<Value>,
77}
78
79/// Trigger information returned by `engine::triggers::list`
80#[derive(Debug, Clone, Serialize, Deserialize)]
81pub struct TriggerInfo {
82    pub id: String,
83    pub trigger_type: String,
84    pub function_id: String,
85    pub config: Value,
86}
87
88/// Telemetry metadata provided by the SDK to the engine.
89#[derive(Debug, Clone, Serialize, Deserialize, Default)]
90pub struct WorkerTelemetryMeta {
91    #[serde(skip_serializing_if = "Option::is_none")]
92    pub language: Option<String>,
93    #[serde(skip_serializing_if = "Option::is_none")]
94    pub project_name: Option<String>,
95    #[serde(skip_serializing_if = "Option::is_none")]
96    pub framework: Option<String>,
97    #[serde(skip_serializing_if = "Option::is_none")]
98    pub amplitude_api_key: Option<String>,
99}
100
101/// Worker metadata for auto-registration
102#[derive(Debug, Clone, Serialize, Deserialize)]
103pub struct WorkerMetadata {
104    pub runtime: String,
105    pub version: String,
106    pub name: String,
107    pub os: String,
108    #[serde(skip_serializing_if = "Option::is_none")]
109    pub pid: Option<u32>,
110    #[serde(skip_serializing_if = "Option::is_none")]
111    pub telemetry: Option<WorkerTelemetryMeta>,
112}
113
114impl Default for WorkerMetadata {
115    fn default() -> Self {
116        let hostname = hostname::get()
117            .map(|h| h.to_string_lossy().to_string())
118            .unwrap_or_else(|_| "unknown".to_string());
119        let pid = std::process::id();
120        let os_info = format!(
121            "{} {} ({})",
122            std::env::consts::OS,
123            std::env::consts::ARCH,
124            std::env::consts::FAMILY
125        );
126
127        let language = std::env::var("LANG")
128            .or_else(|_| std::env::var("LC_ALL"))
129            .ok()
130            .filter(|s| !s.is_empty())
131            .map(|s| s.split('.').next().unwrap_or(&s).to_string());
132
133        Self {
134            runtime: "rust".to_string(),
135            version: SDK_VERSION.to_string(),
136            name: format!("{}:{}", hostname, pid),
137            os: os_info,
138            pid: Some(pid),
139            telemetry: Some(WorkerTelemetryMeta {
140                language,
141                ..Default::default()
142            }),
143        }
144    }
145}
146
147#[allow(clippy::large_enum_variant)]
148enum Outbound {
149    Message(Message),
150    Shutdown,
151}
152
153type PendingInvocation = oneshot::Sender<Result<Value, IIIError>>;
154
155// WebSocket transmitter type alias
156type WsTx = futures_util::stream::SplitSink<
157    tokio_tungstenite::WebSocketStream<tokio_tungstenite::MaybeTlsStream<tokio::net::TcpStream>>,
158    WsMessage,
159>;
160
161/// Inject trace context headers for outbound messages.
162/// Returns (traceparent, baggage) - both None when otel feature is disabled.
163#[cfg(feature = "otel")]
164fn inject_trace_headers() -> (Option<String>, Option<String>) {
165    use crate::telemetry::context;
166    (context::inject_traceparent(), context::inject_baggage())
167}
168
169#[cfg(not(feature = "otel"))]
170fn inject_trace_headers() -> (Option<String>, Option<String>) {
171    (None, None)
172}
173
174/// Connection state for the III WebSocket client
175#[derive(Debug, Clone, Copy, PartialEq, Eq)]
176pub enum IIIConnectionState {
177    Disconnected,
178    Connecting,
179    Connected,
180    Reconnecting,
181    Failed,
182}
183
184/// Callback function type for functions available events
185pub type FunctionsAvailableCallback = Arc<dyn Fn(Vec<FunctionInfo>) + Send + Sync>;
186
187#[derive(Clone)]
188pub struct FunctionRef {
189    pub id: String,
190    unregister_fn: Arc<dyn Fn() + Send + Sync>,
191}
192
193impl FunctionRef {
194    pub fn unregister(&self) {
195        (self.unregister_fn)();
196    }
197}
198
199pub trait IntoFunctionHandler {
200    fn into_parts(self, message: &mut RegisterFunctionMessage) -> Option<RemoteFunctionHandler>;
201}
202
203/// Trait for types that can be passed to [`III::register_function`].
204///
205/// Implemented for:
206/// - [`RegisterFunction`] — the builder API (recommended)
207/// - `(RegisterFunctionMessage, H)` — the legacy tuple API
208pub trait IntoFunctionRegistration {
209    fn into_registration(self) -> (RegisterFunctionMessage, Option<RemoteFunctionHandler>);
210}
211
212impl IntoFunctionRegistration for RegisterFunction {
213    fn into_registration(self) -> (RegisterFunctionMessage, Option<RemoteFunctionHandler>) {
214        (self.message, Some(self.handler))
215    }
216}
217
218impl<H: IntoFunctionHandler> IntoFunctionRegistration for (RegisterFunctionMessage, H) {
219    fn into_registration(self) -> (RegisterFunctionMessage, Option<RemoteFunctionHandler>) {
220        let (mut message, handler) = self;
221        let handler = handler.into_parts(&mut message);
222        (message, handler)
223    }
224}
225
226impl IntoFunctionHandler for HttpInvocationConfig {
227    fn into_parts(self, message: &mut RegisterFunctionMessage) -> Option<RemoteFunctionHandler> {
228        message.invocation = Some(self);
229        None
230    }
231}
232
233impl<F, Fut> IntoFunctionHandler for F
234where
235    F: Fn(Value) -> Fut + Send + Sync + 'static,
236    Fut: std::future::Future<Output = Result<Value, IIIError>> + Send + 'static,
237{
238    fn into_parts(self, _message: &mut RegisterFunctionMessage) -> Option<RemoteFunctionHandler> {
239        Some(Arc::new(move |input: Value| Box::pin(self(input))))
240    }
241}
242
243// =============================================================================
244// iii_fn — sync function wrapper
245// =============================================================================
246
247/// Wrapper for registering sync functions as III handlers via [`iii_fn`].
248///
249/// Created by [`iii_fn`]. Stores a pre-erased handler so that a single
250/// [`IntoFunctionHandler`] impl covers all supported arities.
251pub struct IIIFn<F = ()> {
252    handler: RemoteFunctionHandler,
253    request_format: Option<Value>,
254    response_format: Option<Value>,
255    _marker: std::marker::PhantomData<F>,
256}
257
258fn json_schema_for<T: schemars::JsonSchema>() -> Option<Value> {
259    serde_json::to_value(
260        schemars::r#gen::SchemaSettings::draft07()
261            .into_generator()
262            .into_root_schema_for::<T>(),
263    )
264    .ok()
265}
266
267/// Helper trait used internally to convert a sync function into a
268/// [`RemoteFunctionHandler`].
269#[doc(hidden)]
270pub trait IntoSyncHandler<Marker>: Send + Sync + 'static {
271    fn into_handler(self) -> RemoteFunctionHandler;
272    fn request_format() -> Option<Value> {
273        None
274    }
275    fn response_format() -> Option<Value> {
276        None
277    }
278}
279
280// 1-arg sync — deserializes the entire JSON input as T
281impl<F, T, R, E> IntoSyncHandler<(T, R, E)> for F
282where
283    F: Fn(T) -> Result<R, E> + Send + Sync + 'static,
284    T: serde::de::DeserializeOwned + schemars::JsonSchema + Send + 'static,
285    R: serde::Serialize + schemars::JsonSchema + Send + 'static,
286    E: std::fmt::Display + Send + 'static,
287{
288    fn into_handler(self) -> RemoteFunctionHandler {
289        Arc::new(move |input: Value| {
290            let output = serde_json::from_value::<T>(input)
291                .map_err(|e| IIIError::Handler(e.to_string()))
292                .and_then(|arg| (self)(arg).map_err(|e| IIIError::Handler(e.to_string())))
293                .and_then(|val| {
294                    serde_json::to_value(&val).map_err(|e| IIIError::Handler(e.to_string()))
295                });
296            Box::pin(async move { output })
297        })
298    }
299
300    fn request_format() -> Option<Value> {
301        json_schema_for::<T>()
302    }
303
304    fn response_format() -> Option<Value> {
305        json_schema_for::<R>()
306    }
307}
308
309/// Wraps a **sync** function into an III-compatible handler.
310///
311/// The function must take a single argument implementing
312/// [`serde::de::DeserializeOwned`] and return `Result<R, E>`
313/// where `R: Serialize` and `E: Display`.
314///
315/// The entire JSON input is deserialized as the argument type.
316/// Use a `#[derive(Deserialize)]` struct for named JSON keys.
317///
318/// For async functions, use [`iii_async_fn`] instead.
319pub fn iii_fn<F, M>(f: F) -> IIIFn<F>
320where
321    F: IntoSyncHandler<M>,
322{
323    IIIFn {
324        request_format: F::request_format(),
325        response_format: F::response_format(),
326        handler: f.into_handler(),
327        _marker: std::marker::PhantomData,
328    }
329}
330
331impl<F> IntoFunctionHandler for IIIFn<F> {
332    fn into_parts(self, message: &mut RegisterFunctionMessage) -> Option<RemoteFunctionHandler> {
333        if message.request_format.is_none() {
334            message.request_format = self.request_format;
335        }
336        if message.response_format.is_none() {
337            message.response_format = self.response_format;
338        }
339        Some(self.handler)
340    }
341}
342
343// =============================================================================
344// iii_async_fn — async function wrapper
345// =============================================================================
346
347/// Wrapper for registering async functions as III handlers via [`iii_async_fn`].
348///
349/// Created by [`iii_async_fn`]. Stores a pre-erased handler so that a single
350/// [`IntoFunctionHandler`] impl covers all supported arities.
351pub struct IIIAsyncFn<F = ()> {
352    handler: RemoteFunctionHandler,
353    request_format: Option<Value>,
354    response_format: Option<Value>,
355    _marker: std::marker::PhantomData<F>,
356}
357
358/// Helper trait used internally to convert an async function into a
359/// [`RemoteFunctionHandler`].
360#[doc(hidden)]
361pub trait IntoAsyncHandler<Marker>: Send + Sync + 'static {
362    fn into_handler(self) -> RemoteFunctionHandler;
363    fn request_format() -> Option<Value> {
364        None
365    }
366    fn response_format() -> Option<Value> {
367        None
368    }
369}
370
371// 1-arg async — deserializes the entire JSON input as T
372impl<F, T, Fut, R, E> IntoAsyncHandler<(T, Fut, R, E)> for F
373where
374    F: Fn(T) -> Fut + Send + Sync + 'static,
375    T: serde::de::DeserializeOwned + schemars::JsonSchema + Send + 'static,
376    Fut: std::future::Future<Output = Result<R, E>> + Send + 'static,
377    R: serde::Serialize + schemars::JsonSchema + Send + 'static,
378    E: std::fmt::Display + Send + 'static,
379{
380    fn into_handler(self) -> RemoteFunctionHandler {
381        Arc::new(
382            move |input: Value| -> std::pin::Pin<
383                Box<dyn std::future::Future<Output = Result<Value, IIIError>> + Send>,
384            > {
385                match serde_json::from_value::<T>(input) {
386                    Ok(arg) => {
387                        let fut = (self)(arg);
388                        Box::pin(async move {
389                            fut.await
390                                .map_err(|e| IIIError::Handler(e.to_string()))
391                                .and_then(|val| {
392                                    serde_json::to_value(&val)
393                                        .map_err(|e| IIIError::Handler(e.to_string()))
394                                })
395                        })
396                    }
397                    Err(e) => Box::pin(async move { Err(IIIError::Handler(e.to_string())) }),
398                }
399            },
400        )
401    }
402
403    fn request_format() -> Option<Value> {
404        json_schema_for::<T>()
405    }
406
407    fn response_format() -> Option<Value> {
408        json_schema_for::<R>()
409    }
410}
411
412/// Wraps an **async** function into an III-compatible handler.
413///
414/// The function must take a single argument implementing
415/// [`serde::de::DeserializeOwned`] and return
416/// `impl Future<Output = Result<R, E>>` where `R: Serialize` and `E: Display`.
417pub fn iii_async_fn<F, M>(f: F) -> IIIAsyncFn<F>
418where
419    F: IntoAsyncHandler<M>,
420{
421    IIIAsyncFn {
422        request_format: F::request_format(),
423        response_format: F::response_format(),
424        handler: f.into_handler(),
425        _marker: std::marker::PhantomData,
426    }
427}
428
429impl<F> IntoFunctionHandler for IIIAsyncFn<F> {
430    fn into_parts(self, message: &mut RegisterFunctionMessage) -> Option<RemoteFunctionHandler> {
431        if message.request_format.is_none() {
432            message.request_format = self.request_format;
433        }
434        if message.response_format.is_none() {
435            message.response_format = self.response_format;
436        }
437        Some(self.handler)
438    }
439}
440
441// =============================================================================
442// RegisterFunction — one-step registration builder
443// =============================================================================
444
445/// One-step function registration combining ID, handler, and auto-generated schemas.
446///
447/// Use [`RegisterFunction::new`] for sync functions or [`RegisterFunction::new_async`]
448/// for async functions, then register with [`III::register`].
449pub struct RegisterFunction {
450    message: RegisterFunctionMessage,
451    handler: RemoteFunctionHandler,
452}
453
454impl RegisterFunction {
455    /// Create a registration for a **sync** function.
456    pub fn new<F, M>(id: impl Into<String>, f: F) -> Self
457    where
458        F: IntoSyncHandler<M>,
459    {
460        Self {
461            message: RegisterFunctionMessage {
462                id: id.into(),
463                description: None,
464                request_format: F::request_format(),
465                response_format: F::response_format(),
466                metadata: None,
467                invocation: None,
468            },
469            handler: f.into_handler(),
470        }
471    }
472
473    /// Create a registration for an **async** function.
474    pub fn new_async<F, M>(id: impl Into<String>, f: F) -> Self
475    where
476        F: IntoAsyncHandler<M>,
477    {
478        Self {
479            message: RegisterFunctionMessage {
480                id: id.into(),
481                description: None,
482                request_format: F::request_format(),
483                response_format: F::response_format(),
484                metadata: None,
485                invocation: None,
486            },
487            handler: f.into_handler(),
488        }
489    }
490
491    /// Set the function description.
492    pub fn description(mut self, desc: impl Into<String>) -> Self {
493        self.message.description = Some(desc.into());
494        self
495    }
496
497    /// Set function metadata.
498    pub fn metadata(mut self, meta: Value) -> Self {
499        self.message.metadata = Some(meta);
500        self
501    }
502
503    /// Get the auto-generated request format.
504    pub fn request_format(&self) -> Option<&Value> {
505        self.message.request_format.as_ref()
506    }
507
508    /// Get the auto-generated response format.
509    pub fn response_format(&self) -> Option<&Value> {
510        self.message.response_format.as_ref()
511    }
512}
513
514struct IIIInner {
515    address: String,
516    outbound: mpsc::UnboundedSender<Outbound>,
517    receiver: Mutex<Option<mpsc::UnboundedReceiver<Outbound>>>,
518    running: AtomicBool,
519    started: AtomicBool,
520    pending: Mutex<HashMap<Uuid, PendingInvocation>>,
521    functions: Mutex<HashMap<String, RemoteFunctionData>>,
522    trigger_types: Mutex<HashMap<String, RemoteTriggerTypeData>>,
523    triggers: Mutex<HashMap<String, RegisterTriggerMessage>>,
524    services: Mutex<HashMap<String, RegisterServiceMessage>>,
525    worker_metadata: Mutex<Option<WorkerMetadata>>,
526    connection_state: Mutex<IIIConnectionState>,
527    functions_available_callbacks: Mutex<HashMap<usize, FunctionsAvailableCallback>>,
528    functions_available_callback_counter: AtomicUsize,
529    functions_available_function_id: Mutex<Option<String>>,
530    functions_available_trigger: Mutex<Option<Trigger>>,
531    #[cfg(feature = "otel")]
532    otel_config: Mutex<Option<OtelConfig>>,
533}
534
535/// WebSocket client for communication with the III Engine.
536///
537/// Create with [`register_worker`](crate::register_worker).
538#[derive(Clone)]
539pub struct III {
540    inner: Arc<IIIInner>,
541}
542
543/// Guard that unsubscribes from functions available events when dropped
544pub struct FunctionsAvailableGuard {
545    iii: III,
546    callback_id: usize,
547}
548
549impl Drop for FunctionsAvailableGuard {
550    fn drop(&mut self) {
551        let mut callbacks = self
552            .iii
553            .inner
554            .functions_available_callbacks
555            .lock_or_recover();
556        callbacks.remove(&self.callback_id);
557
558        if callbacks.is_empty() {
559            let mut trigger = self.iii.inner.functions_available_trigger.lock_or_recover();
560            if let Some(trigger) = trigger.take() {
561                trigger.unregister();
562            }
563        }
564    }
565}
566
567impl III {
568    /// Create a new III with default worker metadata (auto-detected runtime, os, hostname)
569    pub fn new(address: &str) -> Self {
570        Self::with_metadata(address, WorkerMetadata::default())
571    }
572
573    /// Create a new III with custom worker metadata
574    pub fn with_metadata(address: &str, metadata: WorkerMetadata) -> Self {
575        let (tx, rx) = mpsc::unbounded_channel();
576        let inner = IIIInner {
577            address: address.into(),
578            outbound: tx,
579            receiver: Mutex::new(Some(rx)),
580            running: AtomicBool::new(false),
581            started: AtomicBool::new(false),
582            pending: Mutex::new(HashMap::new()),
583            functions: Mutex::new(HashMap::new()),
584            trigger_types: Mutex::new(HashMap::new()),
585            triggers: Mutex::new(HashMap::new()),
586            services: Mutex::new(HashMap::new()),
587            worker_metadata: Mutex::new(Some(metadata)),
588            connection_state: Mutex::new(IIIConnectionState::Disconnected),
589            functions_available_callbacks: Mutex::new(HashMap::new()),
590            functions_available_callback_counter: AtomicUsize::new(0),
591            functions_available_function_id: Mutex::new(None),
592            functions_available_trigger: Mutex::new(None),
593            #[cfg(feature = "otel")]
594            otel_config: Mutex::new(None),
595        };
596        Self {
597            inner: Arc::new(inner),
598        }
599    }
600
601    /// Get the engine WebSocket address this client connects to.
602    pub fn address(&self) -> &str {
603        &self.inner.address
604    }
605
606    /// Set custom worker metadata (call before connect)
607    pub fn set_metadata(&self, metadata: WorkerMetadata) {
608        *self.inner.worker_metadata.lock_or_recover() = Some(metadata);
609    }
610
611    /// Set OpenTelemetry configuration (call before connect)
612    #[cfg(feature = "otel")]
613    pub fn set_otel_config(&self, config: OtelConfig) {
614        *self.inner.otel_config.lock_or_recover() = Some(config);
615    }
616
617    pub(crate) fn connect(&self) {
618        if self.inner.started.swap(true, Ordering::SeqCst) {
619            return;
620        }
621
622        let receiver = self.inner.receiver.lock_or_recover().take();
623        let Some(rx) = receiver else { return };
624
625        self.inner.running.store(true, Ordering::SeqCst);
626
627        let iii = self.clone();
628        tokio::spawn(async move {
629            iii.run_connection(rx).await;
630        });
631
632        #[cfg(feature = "otel")]
633        {
634            let config = self.inner.otel_config.lock_or_recover().take();
635            if let Some(mut config) = config {
636                if config.engine_ws_url.is_none() {
637                    config.engine_ws_url = Some(self.inner.address.clone());
638                }
639                tokio::spawn(async move {
640                    telemetry::init_otel(config).await;
641                });
642            }
643        }
644    }
645
646    /// Shutdown the III client.
647    ///
648    /// This stops the connection loop and sends a shutdown signal.
649    /// If the `otel` feature is enabled, this will spawn a background task
650    /// to flush telemetry data, but does NOT wait for it to complete.
651    /// For guaranteed telemetry flush, use `shutdown_async()` instead.
652    #[deprecated(note = "Use shutdown_async() for guaranteed telemetry flush")]
653    pub fn shutdown(&self) {
654        self.inner.running.store(false, Ordering::SeqCst);
655        let _ = self.inner.outbound.send(Outbound::Shutdown);
656        self.set_connection_state(IIIConnectionState::Disconnected);
657
658        #[cfg(feature = "otel")]
659        {
660            tracing::warn!(
661                "shutdown() does not await telemetry flush; use shutdown_async() instead"
662            );
663            tokio::spawn(async {
664                telemetry::shutdown_otel().await;
665            });
666        }
667    }
668
669    /// Shutdown the III client and flush all pending telemetry data.
670    ///
671    /// This method stops the connection loop and sends a shutdown signal.
672    /// When the `otel` feature is enabled, it additionally awaits the
673    /// OpenTelemetry flush, ensuring all spans, metrics, and logs are
674    /// exported before returning.
675    pub async fn shutdown_async(&self) {
676        self.inner.running.store(false, Ordering::SeqCst);
677        let _ = self.inner.outbound.send(Outbound::Shutdown);
678        self.set_connection_state(IIIConnectionState::Disconnected);
679
680        #[cfg(feature = "otel")]
681        telemetry::shutdown_otel().await;
682    }
683
684    fn register_function_inner(
685        &self,
686        message: RegisterFunctionMessage,
687        handler: Option<RemoteFunctionHandler>,
688    ) -> FunctionRef {
689        let id = message.id.clone();
690        if id.trim().is_empty() {
691            panic!("id is required");
692        }
693        let data = RemoteFunctionData {
694            message: message.clone(),
695            handler,
696        };
697        let mut funcs = self.inner.functions.lock_or_recover();
698        match funcs.entry(id.clone()) {
699            std::collections::hash_map::Entry::Occupied(_) => {
700                panic!("function id '{}' already registered", id);
701            }
702            std::collections::hash_map::Entry::Vacant(entry) => {
703                entry.insert(data);
704            }
705        }
706        drop(funcs);
707        let _ = self.send_message(message.to_message());
708
709        let iii = self.clone();
710        let unregister_id = id.clone();
711        let unregister_fn = Arc::new(move || {
712            let _ = iii.inner.functions.lock_or_recover().remove(&unregister_id);
713            let _ = iii.send_message(Message::UnregisterFunction {
714                id: unregister_id.clone(),
715            });
716        });
717
718        FunctionRef { id, unregister_fn }
719    }
720
721    /// Register a function with the engine.
722    ///
723    /// Pass a closure/async fn for local execution, or an [`HttpInvocationConfig`]
724    /// for HTTP-invoked functions (Lambda, Cloudflare Workers, etc.).
725    ///
726    /// # Arguments
727    /// * `message` - Function registration message with id and optional metadata.
728    /// * `handler` - Async handler or HTTP invocation config.
729    ///
730    /// # Panics
731    /// Panics if `id` is empty or already registered.
732    ///
733    /// # Examples
734    /// ```rust,no_run
735    /// use iii_sdk::{register_worker, InitOptions, RegisterFunction};
736    /// use serde::Deserialize;
737    /// use schemars::JsonSchema;
738    ///
739    /// #[derive(Deserialize, JsonSchema)]
740    /// struct Input { name: String }
741    /// fn greet(input: Input) -> Result<String, String> {
742    ///     Ok(format!("Hello, {}!", input.name))
743    /// }
744    ///
745    /// let iii = register_worker("ws://localhost:49134", InitOptions::default());
746    /// iii.register_function(RegisterFunction::new("greet", greet));
747    /// ```
748    ///
749    /// Also accepts a two-argument form via [`register_function_with`](III::register_function_with):
750    /// ```rust,no_run
751    /// # use iii_sdk::{register_worker, InitOptions, RegisterFunctionMessage};
752    /// # use serde_json::{json, Value};
753    /// # let iii = register_worker("ws://localhost:49134", InitOptions::default());
754    /// iii.register_function_with(
755    ///     RegisterFunctionMessage::with_id("echo".to_string()),
756    ///     |input: Value| async move { Ok(json!({"echo": input})) },
757    /// );
758    /// ```
759    pub fn register_function<R: IntoFunctionRegistration>(&self, registration: R) -> FunctionRef {
760        let (message, handler) = registration.into_registration();
761        self.register_function_inner(message, handler)
762    }
763
764    /// Register a function with a message and handler directly.
765    pub fn register_function_with<H: IntoFunctionHandler>(
766        &self,
767        mut message: RegisterFunctionMessage,
768        handler: H,
769    ) -> FunctionRef {
770        let handler = handler.into_parts(&mut message);
771        self.register_function_inner(message, handler)
772    }
773
774    /// Register a service with the engine.
775    ///
776    /// # Arguments
777    /// * `message` - Service registration message with id, name, and optional metadata.
778    pub fn register_service(&self, message: RegisterServiceMessage) {
779        self.inner
780            .services
781            .lock_or_recover()
782            .insert(message.id.clone(), message.clone());
783        let _ = self.send_message(message.to_message());
784    }
785
786    /// Register a custom trigger type with the engine.
787    ///
788    /// # Arguments
789    /// * `id` - Unique trigger type identifier.
790    /// * `description` - Human-readable description.
791    /// * `handler` - Handler implementing [`TriggerHandler`].
792    pub fn register_trigger_type<H>(
793        &self,
794        id: impl Into<String>,
795        description: impl Into<String>,
796        handler: H,
797    ) where
798        H: TriggerHandler + 'static,
799    {
800        let message = RegisterTriggerTypeMessage {
801            id: id.into(),
802            description: description.into(),
803        };
804
805        self.inner.trigger_types.lock_or_recover().insert(
806            message.id.clone(),
807            RemoteTriggerTypeData {
808                message: message.clone(),
809                handler: Arc::new(handler),
810            },
811        );
812
813        let _ = self.send_message(message.to_message());
814    }
815
816    /// Unregister a previously registered trigger type.
817    pub fn unregister_trigger_type(&self, id: impl Into<String>) {
818        let id = id.into();
819        self.inner.trigger_types.lock_or_recover().remove(&id);
820        let msg = UnregisterTriggerTypeMessage { id };
821        let _ = self.send_message(msg.to_message());
822    }
823
824    /// Bind a trigger configuration to a registered function.
825    ///
826    /// # Arguments
827    /// * `input` - Trigger registration input with trigger_type, function_id, and config.
828    ///
829    /// # Examples
830    /// ```rust
831    /// # use iii_sdk::{III, RegisterTriggerInput};
832    /// # use serde_json::json;
833    /// # let iii = III::new("ws://localhost:49134");
834    /// let trigger = iii.register_trigger(RegisterTriggerInput {
835    ///     trigger_type: "http".to_string(),
836    ///     function_id: "greet".to_string(),
837    ///     config: json!({ "api_path": "/greet", "http_method": "GET" }),
838    /// })?;
839    /// // Later...
840    /// trigger.unregister();
841    /// # Ok::<(), iii_sdk::IIIError>(())
842    /// ```
843    pub fn register_trigger(&self, input: RegisterTriggerInput) -> Result<Trigger, IIIError> {
844        let id = Uuid::new_v4().to_string();
845        let message = RegisterTriggerMessage {
846            id: id.clone(),
847            trigger_type: input.trigger_type,
848            function_id: input.function_id,
849            config: input.config,
850        };
851
852        self.inner
853            .triggers
854            .lock_or_recover()
855            .insert(message.id.clone(), message.clone());
856        let _ = self.send_message(message.to_message());
857
858        let iii = self.clone();
859        let trigger_type = message.trigger_type.clone();
860        let unregister_id = message.id.clone();
861        let unregister_fn = Arc::new(move || {
862            let _ = iii.inner.triggers.lock_or_recover().remove(&unregister_id);
863            let msg = UnregisterTriggerMessage {
864                id: unregister_id.clone(),
865                trigger_type: trigger_type.clone(),
866            };
867            let _ = iii.send_message(msg.to_message());
868        });
869
870        Ok(Trigger::new(unregister_fn))
871    }
872
873    /// Invoke a remote function.
874    ///
875    /// The routing behavior depends on the `action` field of the request:
876    /// - No action: synchronous -- waits for the function to return.
877    /// - [`TriggerAction::Enqueue`] - async via named queue.
878    /// - [`TriggerAction::Void`] — fire-and-forget.
879    ///
880    /// # Examples
881    /// ```rust
882    /// # use iii_sdk::{III, TriggerRequest, TriggerAction};
883    /// # use serde_json::json;
884    /// # async fn example(iii: &III) -> Result<(), iii_sdk::IIIError> {
885    /// // Synchronous
886    /// let result = iii.trigger(TriggerRequest {
887    ///     function_id: "greet".to_string(),
888    ///     payload: json!({"name": "World"}),
889    ///     action: None,
890    ///     timeout_ms: None,
891    /// }).await?;
892    ///
893    /// // Fire-and-forget
894    /// iii.trigger(TriggerRequest {
895    ///     function_id: "notify".to_string(),
896    ///     payload: json!({}),
897    ///     action: Some(TriggerAction::Void),
898    ///     timeout_ms: None,
899    /// }).await?;
900    ///
901    /// // Enqueue
902    /// let receipt = iii.trigger(TriggerRequest {
903    ///     function_id: "enqueue".to_string(),
904    ///     payload: json!({"topic": "test"}),
905    ///     action: Some(TriggerAction::Enqueue { queue: "test".to_string() }),
906    ///     timeout_ms: None,
907    /// }).await?;
908    ///
909    /// # Ok(())
910    /// # }
911    /// ```
912    pub async fn trigger(
913        &self,
914        request: impl Into<crate::protocol::TriggerRequest>,
915    ) -> Result<Value, IIIError> {
916        let req = request.into();
917        let (tp, bg) = inject_trace_headers();
918
919        // Void is fire-and-forget — no invocation_id, no response
920        if matches!(req.action, Some(TriggerAction::Void)) {
921            self.send_message(Message::InvokeFunction {
922                invocation_id: None,
923                function_id: req.function_id,
924                data: req.payload,
925                traceparent: tp,
926                baggage: bg,
927                action: req.action,
928            })?;
929            return Ok(Value::Null);
930        }
931
932        // Enqueue and default: use invocation_id to receive acknowledgement/result
933        let timeout = Duration::from_millis(req.timeout_ms.unwrap_or(DEFAULT_TIMEOUT_MS));
934        let invocation_id = Uuid::new_v4();
935        let (tx, rx) = oneshot::channel();
936
937        self.inner
938            .pending
939            .lock_or_recover()
940            .insert(invocation_id, tx);
941
942        self.send_message(Message::InvokeFunction {
943            invocation_id: Some(invocation_id),
944            function_id: req.function_id,
945            data: req.payload,
946            traceparent: tp,
947            baggage: bg,
948            action: req.action,
949        })?;
950
951        match tokio::time::timeout(timeout, rx).await {
952            Ok(Ok(result)) => result,
953            Ok(Err(_)) => Err(IIIError::NotConnected),
954            Err(_) => {
955                self.inner.pending.lock_or_recover().remove(&invocation_id);
956                Err(IIIError::Timeout)
957            }
958        }
959    }
960
961    /// Get the current connection state.
962    pub fn get_connection_state(&self) -> IIIConnectionState {
963        *self.inner.connection_state.lock_or_recover()
964    }
965
966    fn set_connection_state(&self, state: IIIConnectionState) {
967        let mut current = self.inner.connection_state.lock_or_recover();
968        if *current == state {
969            return;
970        }
971        *current = state;
972    }
973
974    /// List all registered functions from the engine
975    pub async fn list_functions(&self) -> Result<Vec<FunctionInfo>, IIIError> {
976        let result = self
977            .trigger(TriggerRequest {
978                function_id: "engine::functions::list".to_string(),
979                payload: serde_json::json!({}),
980                action: None,
981                timeout_ms: None,
982            })
983            .await?;
984
985        let functions = result
986            .get("functions")
987            .and_then(|v| serde_json::from_value::<Vec<FunctionInfo>>(v.clone()).ok())
988            .unwrap_or_default();
989
990        Ok(functions)
991    }
992
993    /// Subscribe to function availability events
994    /// Returns a guard that will unsubscribe when dropped
995    pub fn on_functions_available<F>(&self, callback: F) -> FunctionsAvailableGuard
996    where
997        F: Fn(Vec<FunctionInfo>) + Send + Sync + 'static,
998    {
999        let callback = Arc::new(callback);
1000        let callback_id = self
1001            .inner
1002            .functions_available_callback_counter
1003            .fetch_add(1, Ordering::Relaxed);
1004
1005        self.inner
1006            .functions_available_callbacks
1007            .lock_or_recover()
1008            .insert(callback_id, callback);
1009
1010        // Set up trigger if not already done
1011        let mut trigger_guard = self.inner.functions_available_trigger.lock_or_recover();
1012        if trigger_guard.is_none() {
1013            // Get or create function path (reuse existing if trigger registration previously failed)
1014            let function_id = {
1015                let mut path_guard = self.inner.functions_available_function_id.lock_or_recover();
1016                if path_guard.is_none() {
1017                    let path = format!("iii.on_functions_available.{}", Uuid::new_v4());
1018                    *path_guard = Some(path.clone());
1019                    path
1020                } else {
1021                    path_guard.clone().unwrap()
1022                }
1023            };
1024
1025            // Register handler function only if it doesn't already exist
1026            let function_exists = self
1027                .inner
1028                .functions
1029                .lock_or_recover()
1030                .contains_key(&function_id);
1031            if !function_exists {
1032                let iii = self.clone();
1033                self.register_function_with(
1034                    RegisterFunctionMessage {
1035                        id: function_id.clone(),
1036                        description: None,
1037                        request_format: None,
1038                        response_format: None,
1039                        metadata: None,
1040                        invocation: None,
1041                    },
1042                    move |input: Value| {
1043                        let iii = iii.clone();
1044                        async move {
1045                            let functions = input
1046                                .get("functions")
1047                                .and_then(|v| {
1048                                    serde_json::from_value::<Vec<FunctionInfo>>(v.clone()).ok()
1049                                })
1050                                .unwrap_or_default();
1051
1052                            let callbacks =
1053                                iii.inner.functions_available_callbacks.lock_or_recover();
1054                            for cb in callbacks.values() {
1055                                cb(functions.clone());
1056                            }
1057                            Ok(Value::Null)
1058                        }
1059                    },
1060                );
1061            }
1062
1063            match self.register_trigger(RegisterTriggerInput {
1064                trigger_type: "engine::functions-available".to_string(),
1065                function_id,
1066                config: serde_json::json!({}),
1067            }) {
1068                Ok(trigger) => {
1069                    *trigger_guard = Some(trigger);
1070                }
1071                Err(err) => {
1072                    tracing::warn!(error = %err, "Failed to register functions_available trigger");
1073                }
1074            }
1075        }
1076
1077        FunctionsAvailableGuard {
1078            iii: self.clone(),
1079            callback_id,
1080        }
1081    }
1082
1083    /// List all connected workers from the engine
1084    pub async fn list_workers(&self) -> Result<Vec<WorkerInfo>, IIIError> {
1085        let result = self
1086            .trigger(TriggerRequest {
1087                function_id: "engine::workers::list".to_string(),
1088                payload: serde_json::json!({}),
1089                action: None,
1090                timeout_ms: None,
1091            })
1092            .await?;
1093
1094        let workers = result
1095            .get("workers")
1096            .and_then(|v| serde_json::from_value::<Vec<WorkerInfo>>(v.clone()).ok())
1097            .unwrap_or_default();
1098
1099        Ok(workers)
1100    }
1101
1102    /// List all registered triggers from the engine
1103    pub async fn list_triggers(
1104        &self,
1105        include_internal: bool,
1106    ) -> Result<Vec<TriggerInfo>, IIIError> {
1107        let result = self
1108            .trigger(TriggerRequest {
1109                function_id: "engine::triggers::list".to_string(),
1110                payload: serde_json::json!({ "include_internal": include_internal }),
1111                action: None,
1112                timeout_ms: None,
1113            })
1114            .await?;
1115
1116        let triggers = result
1117            .get("triggers")
1118            .and_then(|v| serde_json::from_value::<Vec<TriggerInfo>>(v.clone()).ok())
1119            .unwrap_or_default();
1120
1121        Ok(triggers)
1122    }
1123
1124    /// Create a streaming channel pair for worker-to-worker data transfer.
1125    ///
1126    /// Returns a `Channel` with writer, reader, and their serializable refs
1127    /// that can be passed as fields in invocation data to other functions.
1128    pub async fn create_channel(&self, buffer_size: Option<usize>) -> Result<Channel, IIIError> {
1129        let result = self
1130            .trigger(TriggerRequest {
1131                function_id: "engine::channels::create".to_string(),
1132                payload: serde_json::json!({ "buffer_size": buffer_size }),
1133                action: None,
1134                timeout_ms: None,
1135            })
1136            .await?;
1137
1138        let writer_ref: StreamChannelRef = serde_json::from_value(
1139            result
1140                .get("writer")
1141                .cloned()
1142                .ok_or_else(|| IIIError::Serde("missing 'writer' in channel response".into()))?,
1143        )
1144        .map_err(|e| IIIError::Serde(e.to_string()))?;
1145
1146        let reader_ref: StreamChannelRef = serde_json::from_value(
1147            result
1148                .get("reader")
1149                .cloned()
1150                .ok_or_else(|| IIIError::Serde("missing 'reader' in channel response".into()))?,
1151        )
1152        .map_err(|e| IIIError::Serde(e.to_string()))?;
1153
1154        Ok(Channel {
1155            writer: ChannelWriter::new(&self.inner.address, &writer_ref),
1156            reader: ChannelReader::new(&self.inner.address, &reader_ref),
1157            writer_ref,
1158            reader_ref,
1159        })
1160    }
1161
1162    /// Register this worker's metadata with the engine (called automatically on connect)
1163    fn register_worker_metadata(&self) {
1164        if let Some(metadata) = self.inner.worker_metadata.lock_or_recover().clone() {
1165            if let Ok(value) = serde_json::to_value(metadata) {
1166                let _ = self.send_message(Message::InvokeFunction {
1167                    invocation_id: None,
1168                    function_id: "engine::workers::register".to_string(),
1169                    data: value,
1170                    traceparent: None,
1171                    baggage: None,
1172                    action: Some(TriggerAction::Void),
1173                });
1174            }
1175        }
1176    }
1177
1178    fn send_message(&self, message: Message) -> Result<(), IIIError> {
1179        if !self.inner.running.load(Ordering::SeqCst) {
1180            return Ok(());
1181        }
1182
1183        self.inner
1184            .outbound
1185            .send(Outbound::Message(message))
1186            .map_err(|_| IIIError::NotConnected)
1187    }
1188
1189    async fn run_connection(&self, mut rx: mpsc::UnboundedReceiver<Outbound>) {
1190        let mut queue: Vec<Message> = Vec::new();
1191        let mut has_connected_before = false;
1192
1193        while self.inner.running.load(Ordering::SeqCst) {
1194            self.set_connection_state(if has_connected_before {
1195                IIIConnectionState::Reconnecting
1196            } else {
1197                IIIConnectionState::Connecting
1198            });
1199
1200            match connect_async(&self.inner.address).await {
1201                Ok((stream, _)) => {
1202                    tracing::info!(address = %self.inner.address, "iii connected");
1203                    has_connected_before = true;
1204                    self.set_connection_state(IIIConnectionState::Connected);
1205                    let (mut ws_tx, mut ws_rx) = stream.split();
1206
1207                    queue.extend(self.collect_registrations());
1208                    Self::dedupe_registrations(&mut queue);
1209                    if let Err(err) = self.flush_queue(&mut ws_tx, &mut queue).await {
1210                        tracing::warn!(error = %err, "failed to flush queue");
1211                        sleep(Duration::from_secs(2)).await;
1212                        continue;
1213                    }
1214
1215                    // Auto-register worker metadata on connect (like Node SDK)
1216                    self.register_worker_metadata();
1217
1218                    let mut should_reconnect = false;
1219
1220                    while self.inner.running.load(Ordering::SeqCst) && !should_reconnect {
1221                        tokio::select! {
1222                            outgoing = rx.recv() => {
1223                                match outgoing {
1224                                    Some(Outbound::Message(message)) => {
1225                                        if let Err(err) = self.send_ws(&mut ws_tx, &message).await {
1226                                            tracing::warn!(error = %err, "send failed; reconnecting");
1227                                            queue.push(message);
1228                                            should_reconnect = true;
1229                                        }
1230                                    }
1231                                    Some(Outbound::Shutdown) => {
1232                                        self.inner.running.store(false, Ordering::SeqCst);
1233                                        return;
1234                                    }
1235                                    None => {
1236                                        self.inner.running.store(false, Ordering::SeqCst);
1237                                        return;
1238                                    }
1239                                }
1240                            }
1241                            incoming = ws_rx.next() => {
1242                                match incoming {
1243                                    Some(Ok(frame)) => {
1244                                        if let Err(err) = self.handle_frame(frame) {
1245                                            tracing::warn!(error = %err, "failed to handle frame");
1246                                        }
1247                                    }
1248                                    Some(Err(err)) => {
1249                                        tracing::warn!(error = %err, "websocket receive error");
1250                                        should_reconnect = true;
1251                                    }
1252                                    None => {
1253                                        should_reconnect = true;
1254                                    }
1255                                }
1256                            }
1257                        }
1258                    }
1259                }
1260                Err(err) => {
1261                    tracing::warn!(error = %err, "failed to connect; retrying");
1262                }
1263            }
1264
1265            if self.inner.running.load(Ordering::SeqCst) {
1266                sleep(Duration::from_secs(2)).await;
1267            }
1268        }
1269    }
1270
1271    fn collect_registrations(&self) -> Vec<Message> {
1272        let mut messages = Vec::new();
1273
1274        for trigger_type in self.inner.trigger_types.lock_or_recover().values() {
1275            messages.push(trigger_type.message.to_message());
1276        }
1277
1278        for service in self.inner.services.lock_or_recover().values() {
1279            messages.push(service.to_message());
1280        }
1281
1282        for function in self.inner.functions.lock_or_recover().values() {
1283            messages.push(function.message.to_message());
1284        }
1285
1286        for trigger in self.inner.triggers.lock_or_recover().values() {
1287            messages.push(trigger.to_message());
1288        }
1289
1290        messages
1291    }
1292
1293    fn dedupe_registrations(queue: &mut Vec<Message>) {
1294        let mut seen = HashSet::new();
1295        let mut deduped_rev = Vec::with_capacity(queue.len());
1296
1297        for message in queue.iter().rev() {
1298            let key = match message {
1299                Message::RegisterTriggerType { id, .. } => format!("trigger_type:{id}"),
1300                Message::RegisterTrigger { id, .. } => format!("trigger:{id}"),
1301                Message::RegisterFunction { id, .. } => {
1302                    format!("function:{id}")
1303                }
1304                Message::RegisterService { id, .. } => format!("service:{id}"),
1305                _ => {
1306                    deduped_rev.push(message.clone());
1307                    continue;
1308                }
1309            };
1310
1311            if seen.insert(key) {
1312                deduped_rev.push(message.clone());
1313            }
1314        }
1315
1316        deduped_rev.reverse();
1317        *queue = deduped_rev;
1318    }
1319
1320    async fn flush_queue(
1321        &self,
1322        ws_tx: &mut WsTx,
1323        queue: &mut Vec<Message>,
1324    ) -> Result<(), IIIError> {
1325        let mut drained = Vec::new();
1326        std::mem::swap(queue, &mut drained);
1327
1328        let mut iter = drained.into_iter();
1329        while let Some(message) = iter.next() {
1330            if let Err(err) = self.send_ws(ws_tx, &message).await {
1331                queue.push(message);
1332                queue.extend(iter);
1333                return Err(err);
1334            }
1335        }
1336
1337        Ok(())
1338    }
1339
1340    async fn send_ws(&self, ws_tx: &mut WsTx, message: &Message) -> Result<(), IIIError> {
1341        let payload = serde_json::to_string(message)?;
1342        ws_tx.send(WsMessage::Text(payload.into())).await?;
1343        Ok(())
1344    }
1345
1346    fn handle_frame(&self, frame: WsMessage) -> Result<(), IIIError> {
1347        match frame {
1348            WsMessage::Text(text) => self.handle_message(&text),
1349            WsMessage::Binary(bytes) => {
1350                let text = String::from_utf8_lossy(&bytes).to_string();
1351                self.handle_message(&text)
1352            }
1353            _ => Ok(()),
1354        }
1355    }
1356
1357    fn handle_message(&self, payload: &str) -> Result<(), IIIError> {
1358        let message: Message = serde_json::from_str(payload)?;
1359
1360        match message {
1361            Message::InvocationResult {
1362                invocation_id,
1363                result,
1364                error,
1365                ..
1366            } => {
1367                self.handle_invocation_result(invocation_id, result, error);
1368            }
1369            Message::InvokeFunction {
1370                invocation_id,
1371                function_id,
1372                data,
1373                traceparent,
1374                baggage,
1375                action: _,
1376            } => {
1377                self.handle_invoke_function(invocation_id, function_id, data, traceparent, baggage);
1378            }
1379            Message::RegisterTrigger {
1380                id,
1381                trigger_type,
1382                function_id,
1383                config,
1384            } => {
1385                self.handle_register_trigger(id, trigger_type, function_id, config);
1386            }
1387            Message::Ping => {
1388                let _ = self.send_message(Message::Pong);
1389            }
1390            Message::WorkerRegistered { worker_id } => {
1391                tracing::debug!(worker_id = %worker_id, "Worker registered");
1392            }
1393            _ => {}
1394        }
1395
1396        Ok(())
1397    }
1398
1399    fn handle_invocation_result(
1400        &self,
1401        invocation_id: Uuid,
1402        result: Option<Value>,
1403        error: Option<ErrorBody>,
1404    ) {
1405        let sender = self.inner.pending.lock_or_recover().remove(&invocation_id);
1406        if let Some(sender) = sender {
1407            let result = match error {
1408                Some(error) => Err(IIIError::Remote {
1409                    code: error.code,
1410                    message: error.message,
1411                    stacktrace: error.stacktrace,
1412                }),
1413                None => Ok(result.unwrap_or(Value::Null)),
1414            };
1415            let _ = sender.send(result);
1416        }
1417    }
1418
1419    fn handle_invoke_function(
1420        &self,
1421        invocation_id: Option<Uuid>,
1422        function_id: String,
1423        data: Value,
1424        traceparent: Option<String>,
1425        baggage: Option<String>,
1426    ) {
1427        tracing::debug!(function_id = %function_id, traceparent = ?traceparent, baggage = ?baggage, "Invoking function");
1428
1429        let func_data = self
1430            .inner
1431            .functions
1432            .lock_or_recover()
1433            .get(&function_id)
1434            .cloned();
1435        let handler = func_data.as_ref().and_then(|d| d.handler.clone());
1436
1437        let Some(handler) = handler else {
1438            let (code, message) = match &func_data {
1439                Some(_) => (
1440                    "function_not_invokable".to_string(),
1441                    "Function is HTTP-invoked and cannot be invoked locally".to_string(),
1442                ),
1443                None => (
1444                    "function_not_found".to_string(),
1445                    "Function not found".to_string(),
1446                ),
1447            };
1448            tracing::warn!(function_id = %function_id, "Invocation: {}", message);
1449
1450            if let Some(invocation_id) = invocation_id {
1451                let (resp_tp, resp_bg) = inject_trace_headers();
1452
1453                let error = ErrorBody {
1454                    code,
1455                    message,
1456                    stacktrace: None,
1457                };
1458                let result = self.send_message(Message::InvocationResult {
1459                    invocation_id,
1460                    function_id,
1461                    result: None,
1462                    error: Some(error),
1463                    traceparent: resp_tp,
1464                    baggage: resp_bg,
1465                });
1466
1467                if let Err(err) = result {
1468                    tracing::warn!(error = %err, "error sending invocation result");
1469                }
1470            }
1471            return;
1472        };
1473
1474        let iii = self.clone();
1475
1476        tokio::spawn(async move {
1477            // Extract incoming trace context and create a span for this invocation.
1478            // This ensures the handler and any outbound calls it makes (e.g.
1479            // invoke_function_with_timeout) are linked as children of the caller's trace.
1480            // We use FutureExt::with_context() instead of cx.attach() because
1481            // ContextGuard is !Send and can't be held across .await in tokio::spawn.
1482            #[cfg(feature = "otel")]
1483            let otel_cx = {
1484                use crate::telemetry::context::extract_context;
1485                use opentelemetry::trace::{SpanKind, TraceContextExt, Tracer};
1486
1487                let parent_cx = extract_context(traceparent.as_deref(), baggage.as_deref());
1488                let tracer = opentelemetry::global::tracer("iii-rust-sdk");
1489                let span = tracer
1490                    .span_builder(format!("call {}", function_id))
1491                    .with_kind(SpanKind::Server)
1492                    .start_with_context(&tracer, &parent_cx);
1493                parent_cx.with_span(span)
1494            };
1495
1496            #[cfg(feature = "otel")]
1497            let result = {
1498                use opentelemetry::trace::FutureExt as OtelFutureExt;
1499                handler(data).with_context(otel_cx.clone()).await
1500            };
1501
1502            #[cfg(not(feature = "otel"))]
1503            let result = handler(data).await;
1504
1505            // Record span status based on result
1506            #[allow(unused_mut)]
1507            let mut error_stacktrace: Option<String> = None;
1508            #[cfg(feature = "otel")]
1509            {
1510                use opentelemetry::KeyValue;
1511                use opentelemetry::trace::{Status, TraceContextExt};
1512                let span = otel_cx.span();
1513                match &result {
1514                    Ok(_) => span.set_status(Status::Ok),
1515                    Err(err) => {
1516                        let (exc_type, exc_message, stacktrace) = match err {
1517                            IIIError::Remote {
1518                                code,
1519                                message,
1520                                stacktrace,
1521                            } => (
1522                                code.clone(),
1523                                message.clone(),
1524                                stacktrace.clone().unwrap_or_else(|| {
1525                                    std::backtrace::Backtrace::force_capture().to_string()
1526                                }),
1527                            ),
1528                            other => (
1529                                "InvocationError".to_string(),
1530                                other.to_string(),
1531                                std::backtrace::Backtrace::force_capture().to_string(),
1532                            ),
1533                        };
1534                        span.set_status(Status::error(exc_message.clone()));
1535                        span.add_event(
1536                            "exception",
1537                            vec![
1538                                KeyValue::new("exception.type", exc_type),
1539                                KeyValue::new("exception.message", exc_message),
1540                                KeyValue::new("exception.stacktrace", stacktrace.clone()),
1541                            ],
1542                        );
1543                        error_stacktrace = Some(stacktrace);
1544                    }
1545                }
1546            }
1547
1548            if let Some(invocation_id) = invocation_id {
1549                // Inject trace context from our span into the response.
1550                // We briefly attach the otel context (no .await crossing)
1551                // so inject_traceparent/inject_baggage can read it.
1552                #[cfg(feature = "otel")]
1553                let (resp_tp, resp_bg) = {
1554                    let _guard = otel_cx.attach();
1555                    inject_trace_headers()
1556                };
1557                #[cfg(not(feature = "otel"))]
1558                let (resp_tp, resp_bg) = inject_trace_headers();
1559
1560                let message = match result {
1561                    Ok(value) => Message::InvocationResult {
1562                        invocation_id,
1563                        function_id,
1564                        result: Some(value),
1565                        error: None,
1566                        traceparent: resp_tp,
1567                        baggage: resp_bg,
1568                    },
1569                    Err(err) => {
1570                        let error_body = match err {
1571                            IIIError::Remote {
1572                                code,
1573                                message,
1574                                stacktrace,
1575                            } => ErrorBody {
1576                                code,
1577                                message,
1578                                stacktrace: stacktrace.or(error_stacktrace).or_else(|| {
1579                                    Some(std::backtrace::Backtrace::force_capture().to_string())
1580                                }),
1581                            },
1582                            other => ErrorBody {
1583                                code: "invocation_failed".to_string(),
1584                                message: other.to_string(),
1585                                stacktrace: error_stacktrace.or_else(|| {
1586                                    Some(std::backtrace::Backtrace::force_capture().to_string())
1587                                }),
1588                            },
1589                        };
1590                        Message::InvocationResult {
1591                            invocation_id,
1592                            function_id,
1593                            result: None,
1594                            error: Some(error_body),
1595                            traceparent: resp_tp,
1596                            baggage: resp_bg,
1597                        }
1598                    }
1599                };
1600
1601                let _ = iii.send_message(message);
1602            } else if let Err(err) = result {
1603                tracing::warn!(error = %err, "error handling async invocation");
1604            }
1605        });
1606    }
1607
1608    fn handle_register_trigger(
1609        &self,
1610        id: String,
1611        trigger_type: String,
1612        function_id: String,
1613        config: Value,
1614    ) {
1615        let handler = self
1616            .inner
1617            .trigger_types
1618            .lock_or_recover()
1619            .get(&trigger_type)
1620            .map(|data| data.handler.clone());
1621
1622        let iii = self.clone();
1623
1624        tokio::spawn(async move {
1625            let message = if let Some(handler) = handler {
1626                let config = TriggerConfig {
1627                    id: id.clone(),
1628                    function_id: function_id.clone(),
1629                    config,
1630                };
1631
1632                match handler.register_trigger(config).await {
1633                    Ok(()) => Message::TriggerRegistrationResult {
1634                        id,
1635                        trigger_type,
1636                        function_id,
1637                        error: None,
1638                    },
1639                    Err(err) => Message::TriggerRegistrationResult {
1640                        id,
1641                        trigger_type,
1642                        function_id,
1643                        error: Some(ErrorBody {
1644                            code: "trigger_registration_failed".to_string(),
1645                            message: err.to_string(),
1646                            stacktrace: None,
1647                        }),
1648                    },
1649                }
1650            } else {
1651                Message::TriggerRegistrationResult {
1652                    id,
1653                    trigger_type,
1654                    function_id,
1655                    error: Some(ErrorBody {
1656                        code: "trigger_type_not_found".to_string(),
1657                        message: "Trigger type not found".to_string(),
1658                        stacktrace: None,
1659                    }),
1660                }
1661            };
1662
1663            let _ = iii.send_message(message);
1664        });
1665    }
1666}
1667
1668#[cfg(test)]
1669mod tests {
1670    use std::collections::HashMap;
1671
1672    use serde_json::json;
1673
1674    use super::*;
1675    use crate::{
1676        InitOptions,
1677        protocol::{HttpInvocationConfig, HttpMethod, RegisterTriggerInput},
1678        register_worker,
1679    };
1680
1681    #[tokio::test]
1682    async fn register_trigger_unregister_removes_entry() {
1683        let iii = register_worker("ws://localhost:1234", InitOptions::default());
1684        let trigger = iii
1685            .register_trigger(RegisterTriggerInput {
1686                trigger_type: "demo".to_string(),
1687                function_id: "functions.echo".to_string(),
1688                config: json!({ "foo": "bar" }),
1689            })
1690            .unwrap();
1691
1692        assert_eq!(iii.inner.triggers.lock().unwrap().len(), 1);
1693
1694        trigger.unregister();
1695
1696        assert_eq!(iii.inner.triggers.lock().unwrap().len(), 0);
1697    }
1698
1699    #[tokio::test]
1700    async fn register_function_with_http_config_stores_and_unregister_removes() {
1701        let iii = register_worker("ws://localhost:1234", InitOptions::default());
1702        let config = HttpInvocationConfig {
1703            url: "https://example.com/invoke".to_string(),
1704            method: HttpMethod::Post,
1705            timeout_ms: Some(30000),
1706            headers: HashMap::new(),
1707            auth: None,
1708        };
1709
1710        let func_ref = iii.register_function_with(
1711            RegisterFunctionMessage {
1712                id: "external::my_lambda".to_string(),
1713                description: None,
1714                request_format: None,
1715                response_format: None,
1716                metadata: None,
1717                invocation: None,
1718            },
1719            config,
1720        );
1721
1722        assert_eq!(func_ref.id, "external::my_lambda");
1723        assert_eq!(iii.inner.functions.lock().unwrap().len(), 1);
1724
1725        func_ref.unregister();
1726
1727        assert_eq!(iii.inner.functions.lock().unwrap().len(), 0);
1728    }
1729
1730    #[tokio::test]
1731    #[should_panic(expected = "id is required")]
1732    async fn register_function_rejects_empty_id() {
1733        let iii = register_worker("ws://localhost:1234", InitOptions::default());
1734        let config = HttpInvocationConfig {
1735            url: "https://example.com/invoke".to_string(),
1736            method: HttpMethod::Post,
1737            timeout_ms: None,
1738            headers: HashMap::new(),
1739            auth: None,
1740        };
1741
1742        iii.register_function_with(
1743            RegisterFunctionMessage {
1744                id: "".to_string(),
1745                description: None,
1746                request_format: None,
1747                response_format: None,
1748                metadata: None,
1749                invocation: None,
1750            },
1751            config,
1752        );
1753    }
1754
1755    #[tokio::test]
1756    async fn invoke_function_times_out_and_clears_pending() {
1757        let iii = register_worker("ws://localhost:1234", InitOptions::default());
1758        let result = iii
1759            .trigger(TriggerRequest {
1760                function_id: "functions.echo".to_string(),
1761                payload: json!({ "a": 1 }),
1762                action: None,
1763                timeout_ms: Some(10),
1764            })
1765            .await;
1766
1767        assert!(matches!(result, Err(IIIError::Timeout)));
1768        assert!(iii.inner.pending.lock().unwrap().is_empty());
1769    }
1770}