1use std::{
2 collections::{HashMap, HashSet},
3 sync::{
4 Arc, Mutex, MutexGuard,
5 atomic::{AtomicBool, AtomicUsize, Ordering},
6 },
7 time::Duration,
8};
9
10trait MutexExt<T> {
13 fn lock_or_recover(&self) -> MutexGuard<'_, T>;
14}
15
16impl<T> MutexExt<T> for Mutex<T> {
17 fn lock_or_recover(&self) -> MutexGuard<'_, T> {
18 self.lock().unwrap_or_else(|e| e.into_inner())
19 }
20}
21
22use futures_util::{SinkExt, StreamExt};
23use serde::{Deserialize, Serialize};
24use serde_json::Value;
25use tokio::{
26 sync::{mpsc, oneshot},
27 time::sleep,
28};
29use tokio_tungstenite::{connect_async, tungstenite::Message as WsMessage};
30use uuid::Uuid;
31
32const SDK_VERSION: &str = env!("CARGO_PKG_VERSION");
33
34use crate::{
35 channels::{ChannelReader, ChannelWriter, StreamChannelRef},
36 error::IIIError,
37 protocol::{
38 ErrorBody, HttpInvocationConfig, Message, RegisterFunctionMessage, RegisterServiceMessage,
39 RegisterTriggerInput, RegisterTriggerMessage, RegisterTriggerTypeMessage, TriggerAction,
40 TriggerRequest, UnregisterTriggerMessage, UnregisterTriggerTypeMessage,
41 },
42 triggers::{Trigger, TriggerConfig, TriggerHandler},
43 types::{Channel, RemoteFunctionData, RemoteFunctionHandler, RemoteTriggerTypeData},
44};
45
46#[cfg(feature = "otel")]
47use crate::telemetry;
48#[cfg(feature = "otel")]
49use crate::telemetry::types::OtelConfig;
50
51const DEFAULT_TIMEOUT_MS: u64 = 30_000;
52
53#[derive(Debug, Clone, Serialize, Deserialize)]
55pub struct WorkerInfo {
56 pub id: String,
57 pub name: Option<String>,
58 pub runtime: Option<String>,
59 pub version: Option<String>,
60 pub os: Option<String>,
61 pub ip_address: Option<String>,
62 pub status: String,
63 pub connected_at_ms: u64,
64 pub function_count: usize,
65 pub functions: Vec<String>,
66 pub active_invocations: usize,
67}
68
69#[derive(Debug, Clone, Serialize, Deserialize)]
71pub struct FunctionInfo {
72 pub function_id: String,
73 pub description: Option<String>,
74 pub request_format: Option<Value>,
75 pub response_format: Option<Value>,
76 pub metadata: Option<Value>,
77}
78
79#[derive(Debug, Clone, Serialize, Deserialize)]
81pub struct TriggerInfo {
82 pub id: String,
83 pub trigger_type: String,
84 pub function_id: String,
85 pub config: Value,
86}
87
88#[derive(Debug, Clone, Serialize, Deserialize, Default)]
90pub struct WorkerTelemetryMeta {
91 #[serde(skip_serializing_if = "Option::is_none")]
92 pub language: Option<String>,
93 #[serde(skip_serializing_if = "Option::is_none")]
94 pub project_name: Option<String>,
95 #[serde(skip_serializing_if = "Option::is_none")]
96 pub framework: Option<String>,
97 #[serde(skip_serializing_if = "Option::is_none")]
98 pub amplitude_api_key: Option<String>,
99}
100
101#[derive(Debug, Clone, Serialize, Deserialize)]
103pub struct WorkerMetadata {
104 pub runtime: String,
105 pub version: String,
106 pub name: String,
107 pub os: String,
108 #[serde(skip_serializing_if = "Option::is_none")]
109 pub pid: Option<u32>,
110 #[serde(skip_serializing_if = "Option::is_none")]
111 pub telemetry: Option<WorkerTelemetryMeta>,
112}
113
114impl Default for WorkerMetadata {
115 fn default() -> Self {
116 let hostname = hostname::get()
117 .map(|h| h.to_string_lossy().to_string())
118 .unwrap_or_else(|_| "unknown".to_string());
119 let pid = std::process::id();
120 let os_info = format!(
121 "{} {} ({})",
122 std::env::consts::OS,
123 std::env::consts::ARCH,
124 std::env::consts::FAMILY
125 );
126
127 let language = std::env::var("LANG")
128 .or_else(|_| std::env::var("LC_ALL"))
129 .ok()
130 .filter(|s| !s.is_empty())
131 .map(|s| s.split('.').next().unwrap_or(&s).to_string());
132
133 Self {
134 runtime: "rust".to_string(),
135 version: SDK_VERSION.to_string(),
136 name: format!("{}:{}", hostname, pid),
137 os: os_info,
138 pid: Some(pid),
139 telemetry: Some(WorkerTelemetryMeta {
140 language,
141 ..Default::default()
142 }),
143 }
144 }
145}
146
147#[allow(clippy::large_enum_variant)]
148enum Outbound {
149 Message(Message),
150 Shutdown,
151}
152
153type PendingInvocation = oneshot::Sender<Result<Value, IIIError>>;
154
155type WsTx = futures_util::stream::SplitSink<
157 tokio_tungstenite::WebSocketStream<tokio_tungstenite::MaybeTlsStream<tokio::net::TcpStream>>,
158 WsMessage,
159>;
160
161#[cfg(feature = "otel")]
164fn inject_trace_headers() -> (Option<String>, Option<String>) {
165 use crate::telemetry::context;
166 (context::inject_traceparent(), context::inject_baggage())
167}
168
169#[cfg(not(feature = "otel"))]
170fn inject_trace_headers() -> (Option<String>, Option<String>) {
171 (None, None)
172}
173
174#[derive(Debug, Clone, Copy, PartialEq, Eq)]
176pub enum IIIConnectionState {
177 Disconnected,
178 Connecting,
179 Connected,
180 Reconnecting,
181 Failed,
182}
183
184pub type FunctionsAvailableCallback = Arc<dyn Fn(Vec<FunctionInfo>) + Send + Sync>;
186
187#[derive(Clone)]
188pub struct FunctionRef {
189 pub id: String,
190 unregister_fn: Arc<dyn Fn() + Send + Sync>,
191}
192
193impl FunctionRef {
194 pub fn unregister(&self) {
195 (self.unregister_fn)();
196 }
197}
198
199pub trait IntoFunctionHandler {
200 fn into_parts(self, message: &mut RegisterFunctionMessage) -> Option<RemoteFunctionHandler>;
201}
202
203pub trait IntoFunctionRegistration {
209 fn into_registration(self) -> (RegisterFunctionMessage, Option<RemoteFunctionHandler>);
210}
211
212impl IntoFunctionRegistration for RegisterFunction {
213 fn into_registration(self) -> (RegisterFunctionMessage, Option<RemoteFunctionHandler>) {
214 (self.message, Some(self.handler))
215 }
216}
217
218impl<H: IntoFunctionHandler> IntoFunctionRegistration for (RegisterFunctionMessage, H) {
219 fn into_registration(self) -> (RegisterFunctionMessage, Option<RemoteFunctionHandler>) {
220 let (mut message, handler) = self;
221 let handler = handler.into_parts(&mut message);
222 (message, handler)
223 }
224}
225
226impl IntoFunctionHandler for HttpInvocationConfig {
227 fn into_parts(self, message: &mut RegisterFunctionMessage) -> Option<RemoteFunctionHandler> {
228 message.invocation = Some(self);
229 None
230 }
231}
232
233impl<F, Fut> IntoFunctionHandler for F
234where
235 F: Fn(Value) -> Fut + Send + Sync + 'static,
236 Fut: std::future::Future<Output = Result<Value, IIIError>> + Send + 'static,
237{
238 fn into_parts(self, _message: &mut RegisterFunctionMessage) -> Option<RemoteFunctionHandler> {
239 Some(Arc::new(move |input: Value| Box::pin(self(input))))
240 }
241}
242
243pub struct IIIFn<F = ()> {
252 handler: RemoteFunctionHandler,
253 request_format: Option<Value>,
254 response_format: Option<Value>,
255 _marker: std::marker::PhantomData<F>,
256}
257
258fn json_schema_for<T: schemars::JsonSchema>() -> Option<Value> {
259 serde_json::to_value(
260 schemars::r#gen::SchemaSettings::draft07()
261 .into_generator()
262 .into_root_schema_for::<T>(),
263 )
264 .ok()
265}
266
267#[doc(hidden)]
270pub trait IntoSyncHandler<Marker>: Send + Sync + 'static {
271 fn into_handler(self) -> RemoteFunctionHandler;
272 fn request_format() -> Option<Value> {
273 None
274 }
275 fn response_format() -> Option<Value> {
276 None
277 }
278}
279
280impl<F, T, R, E> IntoSyncHandler<(T, R, E)> for F
282where
283 F: Fn(T) -> Result<R, E> + Send + Sync + 'static,
284 T: serde::de::DeserializeOwned + schemars::JsonSchema + Send + 'static,
285 R: serde::Serialize + schemars::JsonSchema + Send + 'static,
286 E: std::fmt::Display + Send + 'static,
287{
288 fn into_handler(self) -> RemoteFunctionHandler {
289 Arc::new(move |input: Value| {
290 let output = serde_json::from_value::<T>(input)
291 .map_err(|e| IIIError::Handler(e.to_string()))
292 .and_then(|arg| (self)(arg).map_err(|e| IIIError::Handler(e.to_string())))
293 .and_then(|val| {
294 serde_json::to_value(&val).map_err(|e| IIIError::Handler(e.to_string()))
295 });
296 Box::pin(async move { output })
297 })
298 }
299
300 fn request_format() -> Option<Value> {
301 json_schema_for::<T>()
302 }
303
304 fn response_format() -> Option<Value> {
305 json_schema_for::<R>()
306 }
307}
308
309pub fn iii_fn<F, M>(f: F) -> IIIFn<F>
320where
321 F: IntoSyncHandler<M>,
322{
323 IIIFn {
324 request_format: F::request_format(),
325 response_format: F::response_format(),
326 handler: f.into_handler(),
327 _marker: std::marker::PhantomData,
328 }
329}
330
331impl<F> IntoFunctionHandler for IIIFn<F> {
332 fn into_parts(self, message: &mut RegisterFunctionMessage) -> Option<RemoteFunctionHandler> {
333 if message.request_format.is_none() {
334 message.request_format = self.request_format;
335 }
336 if message.response_format.is_none() {
337 message.response_format = self.response_format;
338 }
339 Some(self.handler)
340 }
341}
342
343pub struct IIIAsyncFn<F = ()> {
352 handler: RemoteFunctionHandler,
353 request_format: Option<Value>,
354 response_format: Option<Value>,
355 _marker: std::marker::PhantomData<F>,
356}
357
358#[doc(hidden)]
361pub trait IntoAsyncHandler<Marker>: Send + Sync + 'static {
362 fn into_handler(self) -> RemoteFunctionHandler;
363 fn request_format() -> Option<Value> {
364 None
365 }
366 fn response_format() -> Option<Value> {
367 None
368 }
369}
370
371impl<F, T, Fut, R, E> IntoAsyncHandler<(T, Fut, R, E)> for F
373where
374 F: Fn(T) -> Fut + Send + Sync + 'static,
375 T: serde::de::DeserializeOwned + schemars::JsonSchema + Send + 'static,
376 Fut: std::future::Future<Output = Result<R, E>> + Send + 'static,
377 R: serde::Serialize + schemars::JsonSchema + Send + 'static,
378 E: std::fmt::Display + Send + 'static,
379{
380 fn into_handler(self) -> RemoteFunctionHandler {
381 Arc::new(
382 move |input: Value| -> std::pin::Pin<
383 Box<dyn std::future::Future<Output = Result<Value, IIIError>> + Send>,
384 > {
385 match serde_json::from_value::<T>(input) {
386 Ok(arg) => {
387 let fut = (self)(arg);
388 Box::pin(async move {
389 fut.await
390 .map_err(|e| IIIError::Handler(e.to_string()))
391 .and_then(|val| {
392 serde_json::to_value(&val)
393 .map_err(|e| IIIError::Handler(e.to_string()))
394 })
395 })
396 }
397 Err(e) => Box::pin(async move { Err(IIIError::Handler(e.to_string())) }),
398 }
399 },
400 )
401 }
402
403 fn request_format() -> Option<Value> {
404 json_schema_for::<T>()
405 }
406
407 fn response_format() -> Option<Value> {
408 json_schema_for::<R>()
409 }
410}
411
412pub fn iii_async_fn<F, M>(f: F) -> IIIAsyncFn<F>
418where
419 F: IntoAsyncHandler<M>,
420{
421 IIIAsyncFn {
422 request_format: F::request_format(),
423 response_format: F::response_format(),
424 handler: f.into_handler(),
425 _marker: std::marker::PhantomData,
426 }
427}
428
429impl<F> IntoFunctionHandler for IIIAsyncFn<F> {
430 fn into_parts(self, message: &mut RegisterFunctionMessage) -> Option<RemoteFunctionHandler> {
431 if message.request_format.is_none() {
432 message.request_format = self.request_format;
433 }
434 if message.response_format.is_none() {
435 message.response_format = self.response_format;
436 }
437 Some(self.handler)
438 }
439}
440
441pub struct RegisterFunction {
450 message: RegisterFunctionMessage,
451 handler: RemoteFunctionHandler,
452}
453
454impl RegisterFunction {
455 pub fn new<F, M>(id: impl Into<String>, f: F) -> Self
457 where
458 F: IntoSyncHandler<M>,
459 {
460 Self {
461 message: RegisterFunctionMessage {
462 id: id.into(),
463 description: None,
464 request_format: F::request_format(),
465 response_format: F::response_format(),
466 metadata: None,
467 invocation: None,
468 },
469 handler: f.into_handler(),
470 }
471 }
472
473 pub fn new_async<F, M>(id: impl Into<String>, f: F) -> Self
475 where
476 F: IntoAsyncHandler<M>,
477 {
478 Self {
479 message: RegisterFunctionMessage {
480 id: id.into(),
481 description: None,
482 request_format: F::request_format(),
483 response_format: F::response_format(),
484 metadata: None,
485 invocation: None,
486 },
487 handler: f.into_handler(),
488 }
489 }
490
491 pub fn description(mut self, desc: impl Into<String>) -> Self {
493 self.message.description = Some(desc.into());
494 self
495 }
496
497 pub fn metadata(mut self, meta: Value) -> Self {
499 self.message.metadata = Some(meta);
500 self
501 }
502
503 pub fn request_format(&self) -> Option<&Value> {
505 self.message.request_format.as_ref()
506 }
507
508 pub fn response_format(&self) -> Option<&Value> {
510 self.message.response_format.as_ref()
511 }
512}
513
514struct IIIInner {
515 address: String,
516 outbound: mpsc::UnboundedSender<Outbound>,
517 receiver: Mutex<Option<mpsc::UnboundedReceiver<Outbound>>>,
518 running: AtomicBool,
519 started: AtomicBool,
520 pending: Mutex<HashMap<Uuid, PendingInvocation>>,
521 functions: Mutex<HashMap<String, RemoteFunctionData>>,
522 trigger_types: Mutex<HashMap<String, RemoteTriggerTypeData>>,
523 triggers: Mutex<HashMap<String, RegisterTriggerMessage>>,
524 services: Mutex<HashMap<String, RegisterServiceMessage>>,
525 worker_metadata: Mutex<Option<WorkerMetadata>>,
526 connection_state: Mutex<IIIConnectionState>,
527 functions_available_callbacks: Mutex<HashMap<usize, FunctionsAvailableCallback>>,
528 functions_available_callback_counter: AtomicUsize,
529 functions_available_function_id: Mutex<Option<String>>,
530 functions_available_trigger: Mutex<Option<Trigger>>,
531 #[cfg(feature = "otel")]
532 otel_config: Mutex<Option<OtelConfig>>,
533}
534
535#[derive(Clone)]
539pub struct III {
540 inner: Arc<IIIInner>,
541}
542
543pub struct FunctionsAvailableGuard {
545 iii: III,
546 callback_id: usize,
547}
548
549impl Drop for FunctionsAvailableGuard {
550 fn drop(&mut self) {
551 let mut callbacks = self
552 .iii
553 .inner
554 .functions_available_callbacks
555 .lock_or_recover();
556 callbacks.remove(&self.callback_id);
557
558 if callbacks.is_empty() {
559 let mut trigger = self.iii.inner.functions_available_trigger.lock_or_recover();
560 if let Some(trigger) = trigger.take() {
561 trigger.unregister();
562 }
563 }
564 }
565}
566
567impl III {
568 pub fn new(address: &str) -> Self {
570 Self::with_metadata(address, WorkerMetadata::default())
571 }
572
573 pub fn with_metadata(address: &str, metadata: WorkerMetadata) -> Self {
575 let (tx, rx) = mpsc::unbounded_channel();
576 let inner = IIIInner {
577 address: address.into(),
578 outbound: tx,
579 receiver: Mutex::new(Some(rx)),
580 running: AtomicBool::new(false),
581 started: AtomicBool::new(false),
582 pending: Mutex::new(HashMap::new()),
583 functions: Mutex::new(HashMap::new()),
584 trigger_types: Mutex::new(HashMap::new()),
585 triggers: Mutex::new(HashMap::new()),
586 services: Mutex::new(HashMap::new()),
587 worker_metadata: Mutex::new(Some(metadata)),
588 connection_state: Mutex::new(IIIConnectionState::Disconnected),
589 functions_available_callbacks: Mutex::new(HashMap::new()),
590 functions_available_callback_counter: AtomicUsize::new(0),
591 functions_available_function_id: Mutex::new(None),
592 functions_available_trigger: Mutex::new(None),
593 #[cfg(feature = "otel")]
594 otel_config: Mutex::new(None),
595 };
596 Self {
597 inner: Arc::new(inner),
598 }
599 }
600
601 pub fn address(&self) -> &str {
603 &self.inner.address
604 }
605
606 pub fn set_metadata(&self, metadata: WorkerMetadata) {
608 *self.inner.worker_metadata.lock_or_recover() = Some(metadata);
609 }
610
611 #[cfg(feature = "otel")]
613 pub fn set_otel_config(&self, config: OtelConfig) {
614 *self.inner.otel_config.lock_or_recover() = Some(config);
615 }
616
617 pub(crate) fn connect(&self) {
618 if self.inner.started.swap(true, Ordering::SeqCst) {
619 return;
620 }
621
622 let receiver = self.inner.receiver.lock_or_recover().take();
623 let Some(rx) = receiver else { return };
624
625 self.inner.running.store(true, Ordering::SeqCst);
626
627 let iii = self.clone();
628 tokio::spawn(async move {
629 iii.run_connection(rx).await;
630 });
631
632 #[cfg(feature = "otel")]
633 {
634 let config = self.inner.otel_config.lock_or_recover().take();
635 if let Some(mut config) = config {
636 if config.engine_ws_url.is_none() {
637 config.engine_ws_url = Some(self.inner.address.clone());
638 }
639 tokio::spawn(async move {
640 telemetry::init_otel(config).await;
641 });
642 }
643 }
644 }
645
646 #[deprecated(note = "Use shutdown_async() for guaranteed telemetry flush")]
653 pub fn shutdown(&self) {
654 self.inner.running.store(false, Ordering::SeqCst);
655 let _ = self.inner.outbound.send(Outbound::Shutdown);
656 self.set_connection_state(IIIConnectionState::Disconnected);
657
658 #[cfg(feature = "otel")]
659 {
660 tracing::warn!(
661 "shutdown() does not await telemetry flush; use shutdown_async() instead"
662 );
663 tokio::spawn(async {
664 telemetry::shutdown_otel().await;
665 });
666 }
667 }
668
669 pub async fn shutdown_async(&self) {
676 self.inner.running.store(false, Ordering::SeqCst);
677 let _ = self.inner.outbound.send(Outbound::Shutdown);
678 self.set_connection_state(IIIConnectionState::Disconnected);
679
680 #[cfg(feature = "otel")]
681 telemetry::shutdown_otel().await;
682 }
683
684 fn register_function_inner(
685 &self,
686 message: RegisterFunctionMessage,
687 handler: Option<RemoteFunctionHandler>,
688 ) -> FunctionRef {
689 let id = message.id.clone();
690 if id.trim().is_empty() {
691 panic!("id is required");
692 }
693 let data = RemoteFunctionData {
694 message: message.clone(),
695 handler,
696 };
697 let mut funcs = self.inner.functions.lock_or_recover();
698 match funcs.entry(id.clone()) {
699 std::collections::hash_map::Entry::Occupied(_) => {
700 panic!("function id '{}' already registered", id);
701 }
702 std::collections::hash_map::Entry::Vacant(entry) => {
703 entry.insert(data);
704 }
705 }
706 drop(funcs);
707 let _ = self.send_message(message.to_message());
708
709 let iii = self.clone();
710 let unregister_id = id.clone();
711 let unregister_fn = Arc::new(move || {
712 let _ = iii.inner.functions.lock_or_recover().remove(&unregister_id);
713 let _ = iii.send_message(Message::UnregisterFunction {
714 id: unregister_id.clone(),
715 });
716 });
717
718 FunctionRef { id, unregister_fn }
719 }
720
721 pub fn register_function<R: IntoFunctionRegistration>(&self, registration: R) -> FunctionRef {
760 let (message, handler) = registration.into_registration();
761 self.register_function_inner(message, handler)
762 }
763
764 pub fn register_function_with<H: IntoFunctionHandler>(
766 &self,
767 mut message: RegisterFunctionMessage,
768 handler: H,
769 ) -> FunctionRef {
770 let handler = handler.into_parts(&mut message);
771 self.register_function_inner(message, handler)
772 }
773
774 pub fn register_service(&self, message: RegisterServiceMessage) {
779 self.inner
780 .services
781 .lock_or_recover()
782 .insert(message.id.clone(), message.clone());
783 let _ = self.send_message(message.to_message());
784 }
785
786 pub fn register_trigger_type<H>(
793 &self,
794 id: impl Into<String>,
795 description: impl Into<String>,
796 handler: H,
797 ) where
798 H: TriggerHandler + 'static,
799 {
800 let message = RegisterTriggerTypeMessage {
801 id: id.into(),
802 description: description.into(),
803 };
804
805 self.inner.trigger_types.lock_or_recover().insert(
806 message.id.clone(),
807 RemoteTriggerTypeData {
808 message: message.clone(),
809 handler: Arc::new(handler),
810 },
811 );
812
813 let _ = self.send_message(message.to_message());
814 }
815
816 pub fn unregister_trigger_type(&self, id: impl Into<String>) {
818 let id = id.into();
819 self.inner.trigger_types.lock_or_recover().remove(&id);
820 let msg = UnregisterTriggerTypeMessage { id };
821 let _ = self.send_message(msg.to_message());
822 }
823
824 pub fn register_trigger(&self, input: RegisterTriggerInput) -> Result<Trigger, IIIError> {
844 let id = Uuid::new_v4().to_string();
845 let message = RegisterTriggerMessage {
846 id: id.clone(),
847 trigger_type: input.trigger_type,
848 function_id: input.function_id,
849 config: input.config,
850 };
851
852 self.inner
853 .triggers
854 .lock_or_recover()
855 .insert(message.id.clone(), message.clone());
856 let _ = self.send_message(message.to_message());
857
858 let iii = self.clone();
859 let trigger_type = message.trigger_type.clone();
860 let unregister_id = message.id.clone();
861 let unregister_fn = Arc::new(move || {
862 let _ = iii.inner.triggers.lock_or_recover().remove(&unregister_id);
863 let msg = UnregisterTriggerMessage {
864 id: unregister_id.clone(),
865 trigger_type: trigger_type.clone(),
866 };
867 let _ = iii.send_message(msg.to_message());
868 });
869
870 Ok(Trigger::new(unregister_fn))
871 }
872
873 pub async fn trigger(
913 &self,
914 request: impl Into<crate::protocol::TriggerRequest>,
915 ) -> Result<Value, IIIError> {
916 let req = request.into();
917 let (tp, bg) = inject_trace_headers();
918
919 if matches!(req.action, Some(TriggerAction::Void)) {
921 self.send_message(Message::InvokeFunction {
922 invocation_id: None,
923 function_id: req.function_id,
924 data: req.payload,
925 traceparent: tp,
926 baggage: bg,
927 action: req.action,
928 })?;
929 return Ok(Value::Null);
930 }
931
932 let timeout = Duration::from_millis(req.timeout_ms.unwrap_or(DEFAULT_TIMEOUT_MS));
934 let invocation_id = Uuid::new_v4();
935 let (tx, rx) = oneshot::channel();
936
937 self.inner
938 .pending
939 .lock_or_recover()
940 .insert(invocation_id, tx);
941
942 self.send_message(Message::InvokeFunction {
943 invocation_id: Some(invocation_id),
944 function_id: req.function_id,
945 data: req.payload,
946 traceparent: tp,
947 baggage: bg,
948 action: req.action,
949 })?;
950
951 match tokio::time::timeout(timeout, rx).await {
952 Ok(Ok(result)) => result,
953 Ok(Err(_)) => Err(IIIError::NotConnected),
954 Err(_) => {
955 self.inner.pending.lock_or_recover().remove(&invocation_id);
956 Err(IIIError::Timeout)
957 }
958 }
959 }
960
961 pub fn get_connection_state(&self) -> IIIConnectionState {
963 *self.inner.connection_state.lock_or_recover()
964 }
965
966 fn set_connection_state(&self, state: IIIConnectionState) {
967 let mut current = self.inner.connection_state.lock_or_recover();
968 if *current == state {
969 return;
970 }
971 *current = state;
972 }
973
974 pub async fn list_functions(&self) -> Result<Vec<FunctionInfo>, IIIError> {
976 let result = self
977 .trigger(TriggerRequest {
978 function_id: "engine::functions::list".to_string(),
979 payload: serde_json::json!({}),
980 action: None,
981 timeout_ms: None,
982 })
983 .await?;
984
985 let functions = result
986 .get("functions")
987 .and_then(|v| serde_json::from_value::<Vec<FunctionInfo>>(v.clone()).ok())
988 .unwrap_or_default();
989
990 Ok(functions)
991 }
992
993 pub fn on_functions_available<F>(&self, callback: F) -> FunctionsAvailableGuard
996 where
997 F: Fn(Vec<FunctionInfo>) + Send + Sync + 'static,
998 {
999 let callback = Arc::new(callback);
1000 let callback_id = self
1001 .inner
1002 .functions_available_callback_counter
1003 .fetch_add(1, Ordering::Relaxed);
1004
1005 self.inner
1006 .functions_available_callbacks
1007 .lock_or_recover()
1008 .insert(callback_id, callback);
1009
1010 let mut trigger_guard = self.inner.functions_available_trigger.lock_or_recover();
1012 if trigger_guard.is_none() {
1013 let function_id = {
1015 let mut path_guard = self.inner.functions_available_function_id.lock_or_recover();
1016 if path_guard.is_none() {
1017 let path = format!("iii.on_functions_available.{}", Uuid::new_v4());
1018 *path_guard = Some(path.clone());
1019 path
1020 } else {
1021 path_guard.clone().unwrap()
1022 }
1023 };
1024
1025 let function_exists = self
1027 .inner
1028 .functions
1029 .lock_or_recover()
1030 .contains_key(&function_id);
1031 if !function_exists {
1032 let iii = self.clone();
1033 self.register_function_with(
1034 RegisterFunctionMessage {
1035 id: function_id.clone(),
1036 description: None,
1037 request_format: None,
1038 response_format: None,
1039 metadata: None,
1040 invocation: None,
1041 },
1042 move |input: Value| {
1043 let iii = iii.clone();
1044 async move {
1045 let functions = input
1046 .get("functions")
1047 .and_then(|v| {
1048 serde_json::from_value::<Vec<FunctionInfo>>(v.clone()).ok()
1049 })
1050 .unwrap_or_default();
1051
1052 let callbacks =
1053 iii.inner.functions_available_callbacks.lock_or_recover();
1054 for cb in callbacks.values() {
1055 cb(functions.clone());
1056 }
1057 Ok(Value::Null)
1058 }
1059 },
1060 );
1061 }
1062
1063 match self.register_trigger(RegisterTriggerInput {
1064 trigger_type: "engine::functions-available".to_string(),
1065 function_id,
1066 config: serde_json::json!({}),
1067 }) {
1068 Ok(trigger) => {
1069 *trigger_guard = Some(trigger);
1070 }
1071 Err(err) => {
1072 tracing::warn!(error = %err, "Failed to register functions_available trigger");
1073 }
1074 }
1075 }
1076
1077 FunctionsAvailableGuard {
1078 iii: self.clone(),
1079 callback_id,
1080 }
1081 }
1082
1083 pub async fn list_workers(&self) -> Result<Vec<WorkerInfo>, IIIError> {
1085 let result = self
1086 .trigger(TriggerRequest {
1087 function_id: "engine::workers::list".to_string(),
1088 payload: serde_json::json!({}),
1089 action: None,
1090 timeout_ms: None,
1091 })
1092 .await?;
1093
1094 let workers = result
1095 .get("workers")
1096 .and_then(|v| serde_json::from_value::<Vec<WorkerInfo>>(v.clone()).ok())
1097 .unwrap_or_default();
1098
1099 Ok(workers)
1100 }
1101
1102 pub async fn list_triggers(
1104 &self,
1105 include_internal: bool,
1106 ) -> Result<Vec<TriggerInfo>, IIIError> {
1107 let result = self
1108 .trigger(TriggerRequest {
1109 function_id: "engine::triggers::list".to_string(),
1110 payload: serde_json::json!({ "include_internal": include_internal }),
1111 action: None,
1112 timeout_ms: None,
1113 })
1114 .await?;
1115
1116 let triggers = result
1117 .get("triggers")
1118 .and_then(|v| serde_json::from_value::<Vec<TriggerInfo>>(v.clone()).ok())
1119 .unwrap_or_default();
1120
1121 Ok(triggers)
1122 }
1123
1124 pub async fn create_channel(&self, buffer_size: Option<usize>) -> Result<Channel, IIIError> {
1129 let result = self
1130 .trigger(TriggerRequest {
1131 function_id: "engine::channels::create".to_string(),
1132 payload: serde_json::json!({ "buffer_size": buffer_size }),
1133 action: None,
1134 timeout_ms: None,
1135 })
1136 .await?;
1137
1138 let writer_ref: StreamChannelRef = serde_json::from_value(
1139 result
1140 .get("writer")
1141 .cloned()
1142 .ok_or_else(|| IIIError::Serde("missing 'writer' in channel response".into()))?,
1143 )
1144 .map_err(|e| IIIError::Serde(e.to_string()))?;
1145
1146 let reader_ref: StreamChannelRef = serde_json::from_value(
1147 result
1148 .get("reader")
1149 .cloned()
1150 .ok_or_else(|| IIIError::Serde("missing 'reader' in channel response".into()))?,
1151 )
1152 .map_err(|e| IIIError::Serde(e.to_string()))?;
1153
1154 Ok(Channel {
1155 writer: ChannelWriter::new(&self.inner.address, &writer_ref),
1156 reader: ChannelReader::new(&self.inner.address, &reader_ref),
1157 writer_ref,
1158 reader_ref,
1159 })
1160 }
1161
1162 fn register_worker_metadata(&self) {
1164 if let Some(metadata) = self.inner.worker_metadata.lock_or_recover().clone() {
1165 if let Ok(value) = serde_json::to_value(metadata) {
1166 let _ = self.send_message(Message::InvokeFunction {
1167 invocation_id: None,
1168 function_id: "engine::workers::register".to_string(),
1169 data: value,
1170 traceparent: None,
1171 baggage: None,
1172 action: Some(TriggerAction::Void),
1173 });
1174 }
1175 }
1176 }
1177
1178 fn send_message(&self, message: Message) -> Result<(), IIIError> {
1179 if !self.inner.running.load(Ordering::SeqCst) {
1180 return Ok(());
1181 }
1182
1183 self.inner
1184 .outbound
1185 .send(Outbound::Message(message))
1186 .map_err(|_| IIIError::NotConnected)
1187 }
1188
1189 async fn run_connection(&self, mut rx: mpsc::UnboundedReceiver<Outbound>) {
1190 let mut queue: Vec<Message> = Vec::new();
1191 let mut has_connected_before = false;
1192
1193 while self.inner.running.load(Ordering::SeqCst) {
1194 self.set_connection_state(if has_connected_before {
1195 IIIConnectionState::Reconnecting
1196 } else {
1197 IIIConnectionState::Connecting
1198 });
1199
1200 match connect_async(&self.inner.address).await {
1201 Ok((stream, _)) => {
1202 tracing::info!(address = %self.inner.address, "iii connected");
1203 has_connected_before = true;
1204 self.set_connection_state(IIIConnectionState::Connected);
1205 let (mut ws_tx, mut ws_rx) = stream.split();
1206
1207 queue.extend(self.collect_registrations());
1208 Self::dedupe_registrations(&mut queue);
1209 if let Err(err) = self.flush_queue(&mut ws_tx, &mut queue).await {
1210 tracing::warn!(error = %err, "failed to flush queue");
1211 sleep(Duration::from_secs(2)).await;
1212 continue;
1213 }
1214
1215 self.register_worker_metadata();
1217
1218 let mut should_reconnect = false;
1219
1220 while self.inner.running.load(Ordering::SeqCst) && !should_reconnect {
1221 tokio::select! {
1222 outgoing = rx.recv() => {
1223 match outgoing {
1224 Some(Outbound::Message(message)) => {
1225 if let Err(err) = self.send_ws(&mut ws_tx, &message).await {
1226 tracing::warn!(error = %err, "send failed; reconnecting");
1227 queue.push(message);
1228 should_reconnect = true;
1229 }
1230 }
1231 Some(Outbound::Shutdown) => {
1232 self.inner.running.store(false, Ordering::SeqCst);
1233 return;
1234 }
1235 None => {
1236 self.inner.running.store(false, Ordering::SeqCst);
1237 return;
1238 }
1239 }
1240 }
1241 incoming = ws_rx.next() => {
1242 match incoming {
1243 Some(Ok(frame)) => {
1244 if let Err(err) = self.handle_frame(frame) {
1245 tracing::warn!(error = %err, "failed to handle frame");
1246 }
1247 }
1248 Some(Err(err)) => {
1249 tracing::warn!(error = %err, "websocket receive error");
1250 should_reconnect = true;
1251 }
1252 None => {
1253 should_reconnect = true;
1254 }
1255 }
1256 }
1257 }
1258 }
1259 }
1260 Err(err) => {
1261 tracing::warn!(error = %err, "failed to connect; retrying");
1262 }
1263 }
1264
1265 if self.inner.running.load(Ordering::SeqCst) {
1266 sleep(Duration::from_secs(2)).await;
1267 }
1268 }
1269 }
1270
1271 fn collect_registrations(&self) -> Vec<Message> {
1272 let mut messages = Vec::new();
1273
1274 for trigger_type in self.inner.trigger_types.lock_or_recover().values() {
1275 messages.push(trigger_type.message.to_message());
1276 }
1277
1278 for service in self.inner.services.lock_or_recover().values() {
1279 messages.push(service.to_message());
1280 }
1281
1282 for function in self.inner.functions.lock_or_recover().values() {
1283 messages.push(function.message.to_message());
1284 }
1285
1286 for trigger in self.inner.triggers.lock_or_recover().values() {
1287 messages.push(trigger.to_message());
1288 }
1289
1290 messages
1291 }
1292
1293 fn dedupe_registrations(queue: &mut Vec<Message>) {
1294 let mut seen = HashSet::new();
1295 let mut deduped_rev = Vec::with_capacity(queue.len());
1296
1297 for message in queue.iter().rev() {
1298 let key = match message {
1299 Message::RegisterTriggerType { id, .. } => format!("trigger_type:{id}"),
1300 Message::RegisterTrigger { id, .. } => format!("trigger:{id}"),
1301 Message::RegisterFunction { id, .. } => {
1302 format!("function:{id}")
1303 }
1304 Message::RegisterService { id, .. } => format!("service:{id}"),
1305 _ => {
1306 deduped_rev.push(message.clone());
1307 continue;
1308 }
1309 };
1310
1311 if seen.insert(key) {
1312 deduped_rev.push(message.clone());
1313 }
1314 }
1315
1316 deduped_rev.reverse();
1317 *queue = deduped_rev;
1318 }
1319
1320 async fn flush_queue(
1321 &self,
1322 ws_tx: &mut WsTx,
1323 queue: &mut Vec<Message>,
1324 ) -> Result<(), IIIError> {
1325 let mut drained = Vec::new();
1326 std::mem::swap(queue, &mut drained);
1327
1328 let mut iter = drained.into_iter();
1329 while let Some(message) = iter.next() {
1330 if let Err(err) = self.send_ws(ws_tx, &message).await {
1331 queue.push(message);
1332 queue.extend(iter);
1333 return Err(err);
1334 }
1335 }
1336
1337 Ok(())
1338 }
1339
1340 async fn send_ws(&self, ws_tx: &mut WsTx, message: &Message) -> Result<(), IIIError> {
1341 let payload = serde_json::to_string(message)?;
1342 ws_tx.send(WsMessage::Text(payload.into())).await?;
1343 Ok(())
1344 }
1345
1346 fn handle_frame(&self, frame: WsMessage) -> Result<(), IIIError> {
1347 match frame {
1348 WsMessage::Text(text) => self.handle_message(&text),
1349 WsMessage::Binary(bytes) => {
1350 let text = String::from_utf8_lossy(&bytes).to_string();
1351 self.handle_message(&text)
1352 }
1353 _ => Ok(()),
1354 }
1355 }
1356
1357 fn handle_message(&self, payload: &str) -> Result<(), IIIError> {
1358 let message: Message = serde_json::from_str(payload)?;
1359
1360 match message {
1361 Message::InvocationResult {
1362 invocation_id,
1363 result,
1364 error,
1365 ..
1366 } => {
1367 self.handle_invocation_result(invocation_id, result, error);
1368 }
1369 Message::InvokeFunction {
1370 invocation_id,
1371 function_id,
1372 data,
1373 traceparent,
1374 baggage,
1375 action: _,
1376 } => {
1377 self.handle_invoke_function(invocation_id, function_id, data, traceparent, baggage);
1378 }
1379 Message::RegisterTrigger {
1380 id,
1381 trigger_type,
1382 function_id,
1383 config,
1384 } => {
1385 self.handle_register_trigger(id, trigger_type, function_id, config);
1386 }
1387 Message::Ping => {
1388 let _ = self.send_message(Message::Pong);
1389 }
1390 Message::WorkerRegistered { worker_id } => {
1391 tracing::debug!(worker_id = %worker_id, "Worker registered");
1392 }
1393 _ => {}
1394 }
1395
1396 Ok(())
1397 }
1398
1399 fn handle_invocation_result(
1400 &self,
1401 invocation_id: Uuid,
1402 result: Option<Value>,
1403 error: Option<ErrorBody>,
1404 ) {
1405 let sender = self.inner.pending.lock_or_recover().remove(&invocation_id);
1406 if let Some(sender) = sender {
1407 let result = match error {
1408 Some(error) => Err(IIIError::Remote {
1409 code: error.code,
1410 message: error.message,
1411 stacktrace: error.stacktrace,
1412 }),
1413 None => Ok(result.unwrap_or(Value::Null)),
1414 };
1415 let _ = sender.send(result);
1416 }
1417 }
1418
1419 fn handle_invoke_function(
1420 &self,
1421 invocation_id: Option<Uuid>,
1422 function_id: String,
1423 data: Value,
1424 traceparent: Option<String>,
1425 baggage: Option<String>,
1426 ) {
1427 tracing::debug!(function_id = %function_id, traceparent = ?traceparent, baggage = ?baggage, "Invoking function");
1428
1429 let func_data = self
1430 .inner
1431 .functions
1432 .lock_or_recover()
1433 .get(&function_id)
1434 .cloned();
1435 let handler = func_data.as_ref().and_then(|d| d.handler.clone());
1436
1437 let Some(handler) = handler else {
1438 let (code, message) = match &func_data {
1439 Some(_) => (
1440 "function_not_invokable".to_string(),
1441 "Function is HTTP-invoked and cannot be invoked locally".to_string(),
1442 ),
1443 None => (
1444 "function_not_found".to_string(),
1445 "Function not found".to_string(),
1446 ),
1447 };
1448 tracing::warn!(function_id = %function_id, "Invocation: {}", message);
1449
1450 if let Some(invocation_id) = invocation_id {
1451 let (resp_tp, resp_bg) = inject_trace_headers();
1452
1453 let error = ErrorBody {
1454 code,
1455 message,
1456 stacktrace: None,
1457 };
1458 let result = self.send_message(Message::InvocationResult {
1459 invocation_id,
1460 function_id,
1461 result: None,
1462 error: Some(error),
1463 traceparent: resp_tp,
1464 baggage: resp_bg,
1465 });
1466
1467 if let Err(err) = result {
1468 tracing::warn!(error = %err, "error sending invocation result");
1469 }
1470 }
1471 return;
1472 };
1473
1474 let iii = self.clone();
1475
1476 tokio::spawn(async move {
1477 #[cfg(feature = "otel")]
1483 let otel_cx = {
1484 use crate::telemetry::context::extract_context;
1485 use opentelemetry::trace::{SpanKind, TraceContextExt, Tracer};
1486
1487 let parent_cx = extract_context(traceparent.as_deref(), baggage.as_deref());
1488 let tracer = opentelemetry::global::tracer("iii-rust-sdk");
1489 let span = tracer
1490 .span_builder(format!("call {}", function_id))
1491 .with_kind(SpanKind::Server)
1492 .start_with_context(&tracer, &parent_cx);
1493 parent_cx.with_span(span)
1494 };
1495
1496 #[cfg(feature = "otel")]
1497 let result = {
1498 use opentelemetry::trace::FutureExt as OtelFutureExt;
1499 handler(data).with_context(otel_cx.clone()).await
1500 };
1501
1502 #[cfg(not(feature = "otel"))]
1503 let result = handler(data).await;
1504
1505 #[allow(unused_mut)]
1507 let mut error_stacktrace: Option<String> = None;
1508 #[cfg(feature = "otel")]
1509 {
1510 use opentelemetry::KeyValue;
1511 use opentelemetry::trace::{Status, TraceContextExt};
1512 let span = otel_cx.span();
1513 match &result {
1514 Ok(_) => span.set_status(Status::Ok),
1515 Err(err) => {
1516 let (exc_type, exc_message, stacktrace) = match err {
1517 IIIError::Remote {
1518 code,
1519 message,
1520 stacktrace,
1521 } => (
1522 code.clone(),
1523 message.clone(),
1524 stacktrace.clone().unwrap_or_else(|| {
1525 std::backtrace::Backtrace::force_capture().to_string()
1526 }),
1527 ),
1528 other => (
1529 "InvocationError".to_string(),
1530 other.to_string(),
1531 std::backtrace::Backtrace::force_capture().to_string(),
1532 ),
1533 };
1534 span.set_status(Status::error(exc_message.clone()));
1535 span.add_event(
1536 "exception",
1537 vec![
1538 KeyValue::new("exception.type", exc_type),
1539 KeyValue::new("exception.message", exc_message),
1540 KeyValue::new("exception.stacktrace", stacktrace.clone()),
1541 ],
1542 );
1543 error_stacktrace = Some(stacktrace);
1544 }
1545 }
1546 }
1547
1548 if let Some(invocation_id) = invocation_id {
1549 #[cfg(feature = "otel")]
1553 let (resp_tp, resp_bg) = {
1554 let _guard = otel_cx.attach();
1555 inject_trace_headers()
1556 };
1557 #[cfg(not(feature = "otel"))]
1558 let (resp_tp, resp_bg) = inject_trace_headers();
1559
1560 let message = match result {
1561 Ok(value) => Message::InvocationResult {
1562 invocation_id,
1563 function_id,
1564 result: Some(value),
1565 error: None,
1566 traceparent: resp_tp,
1567 baggage: resp_bg,
1568 },
1569 Err(err) => {
1570 let error_body = match err {
1571 IIIError::Remote {
1572 code,
1573 message,
1574 stacktrace,
1575 } => ErrorBody {
1576 code,
1577 message,
1578 stacktrace: stacktrace.or(error_stacktrace).or_else(|| {
1579 Some(std::backtrace::Backtrace::force_capture().to_string())
1580 }),
1581 },
1582 other => ErrorBody {
1583 code: "invocation_failed".to_string(),
1584 message: other.to_string(),
1585 stacktrace: error_stacktrace.or_else(|| {
1586 Some(std::backtrace::Backtrace::force_capture().to_string())
1587 }),
1588 },
1589 };
1590 Message::InvocationResult {
1591 invocation_id,
1592 function_id,
1593 result: None,
1594 error: Some(error_body),
1595 traceparent: resp_tp,
1596 baggage: resp_bg,
1597 }
1598 }
1599 };
1600
1601 let _ = iii.send_message(message);
1602 } else if let Err(err) = result {
1603 tracing::warn!(error = %err, "error handling async invocation");
1604 }
1605 });
1606 }
1607
1608 fn handle_register_trigger(
1609 &self,
1610 id: String,
1611 trigger_type: String,
1612 function_id: String,
1613 config: Value,
1614 ) {
1615 let handler = self
1616 .inner
1617 .trigger_types
1618 .lock_or_recover()
1619 .get(&trigger_type)
1620 .map(|data| data.handler.clone());
1621
1622 let iii = self.clone();
1623
1624 tokio::spawn(async move {
1625 let message = if let Some(handler) = handler {
1626 let config = TriggerConfig {
1627 id: id.clone(),
1628 function_id: function_id.clone(),
1629 config,
1630 };
1631
1632 match handler.register_trigger(config).await {
1633 Ok(()) => Message::TriggerRegistrationResult {
1634 id,
1635 trigger_type,
1636 function_id,
1637 error: None,
1638 },
1639 Err(err) => Message::TriggerRegistrationResult {
1640 id,
1641 trigger_type,
1642 function_id,
1643 error: Some(ErrorBody {
1644 code: "trigger_registration_failed".to_string(),
1645 message: err.to_string(),
1646 stacktrace: None,
1647 }),
1648 },
1649 }
1650 } else {
1651 Message::TriggerRegistrationResult {
1652 id,
1653 trigger_type,
1654 function_id,
1655 error: Some(ErrorBody {
1656 code: "trigger_type_not_found".to_string(),
1657 message: "Trigger type not found".to_string(),
1658 stacktrace: None,
1659 }),
1660 }
1661 };
1662
1663 let _ = iii.send_message(message);
1664 });
1665 }
1666}
1667
1668#[cfg(test)]
1669mod tests {
1670 use std::collections::HashMap;
1671
1672 use serde_json::json;
1673
1674 use super::*;
1675 use crate::{
1676 InitOptions,
1677 protocol::{HttpInvocationConfig, HttpMethod, RegisterTriggerInput},
1678 register_worker,
1679 };
1680
1681 #[tokio::test]
1682 async fn register_trigger_unregister_removes_entry() {
1683 let iii = register_worker("ws://localhost:1234", InitOptions::default());
1684 let trigger = iii
1685 .register_trigger(RegisterTriggerInput {
1686 trigger_type: "demo".to_string(),
1687 function_id: "functions.echo".to_string(),
1688 config: json!({ "foo": "bar" }),
1689 })
1690 .unwrap();
1691
1692 assert_eq!(iii.inner.triggers.lock().unwrap().len(), 1);
1693
1694 trigger.unregister();
1695
1696 assert_eq!(iii.inner.triggers.lock().unwrap().len(), 0);
1697 }
1698
1699 #[tokio::test]
1700 async fn register_function_with_http_config_stores_and_unregister_removes() {
1701 let iii = register_worker("ws://localhost:1234", InitOptions::default());
1702 let config = HttpInvocationConfig {
1703 url: "https://example.com/invoke".to_string(),
1704 method: HttpMethod::Post,
1705 timeout_ms: Some(30000),
1706 headers: HashMap::new(),
1707 auth: None,
1708 };
1709
1710 let func_ref = iii.register_function_with(
1711 RegisterFunctionMessage {
1712 id: "external::my_lambda".to_string(),
1713 description: None,
1714 request_format: None,
1715 response_format: None,
1716 metadata: None,
1717 invocation: None,
1718 },
1719 config,
1720 );
1721
1722 assert_eq!(func_ref.id, "external::my_lambda");
1723 assert_eq!(iii.inner.functions.lock().unwrap().len(), 1);
1724
1725 func_ref.unregister();
1726
1727 assert_eq!(iii.inner.functions.lock().unwrap().len(), 0);
1728 }
1729
1730 #[tokio::test]
1731 #[should_panic(expected = "id is required")]
1732 async fn register_function_rejects_empty_id() {
1733 let iii = register_worker("ws://localhost:1234", InitOptions::default());
1734 let config = HttpInvocationConfig {
1735 url: "https://example.com/invoke".to_string(),
1736 method: HttpMethod::Post,
1737 timeout_ms: None,
1738 headers: HashMap::new(),
1739 auth: None,
1740 };
1741
1742 iii.register_function_with(
1743 RegisterFunctionMessage {
1744 id: "".to_string(),
1745 description: None,
1746 request_format: None,
1747 response_format: None,
1748 metadata: None,
1749 invocation: None,
1750 },
1751 config,
1752 );
1753 }
1754
1755 #[tokio::test]
1756 async fn invoke_function_times_out_and_clears_pending() {
1757 let iii = register_worker("ws://localhost:1234", InitOptions::default());
1758 let result = iii
1759 .trigger(TriggerRequest {
1760 function_id: "functions.echo".to_string(),
1761 payload: json!({ "a": 1 }),
1762 action: None,
1763 timeout_ms: Some(10),
1764 })
1765 .await;
1766
1767 assert!(matches!(result, Err(IIIError::Timeout)));
1768 assert!(iii.inner.pending.lock().unwrap().is_empty());
1769 }
1770}