Skip to main content

memlink_msdk/
caller.rs

1//! Module caller for nested invocations via internal channel system.
2
3use std::future::Future;
4use std::pin::Pin;
5use std::sync::Arc;
6use std::time::Duration;
7
8use crate::error::{ModuleError, Result};
9
10pub const MAX_CALL_DEPTH: u32 = 5;
11
12#[derive(Debug, Clone)]
13pub struct InternalRequest {
14    pub target_module: String,
15    pub target_method: String,
16    pub args: Vec<u8>,
17    pub caller_trace_id: u128,
18    pub deadline: Option<std::time::Instant>,
19    pub depth: u32,
20}
21
22#[derive(Debug, Clone)]
23pub struct InternalResponse {
24    pub data: Option<Vec<u8>>,
25    pub error: Option<(i32, String)>,
26}
27
28impl InternalResponse {
29    pub fn success(data: Vec<u8>) -> Self {
30        InternalResponse {
31            data: Some(data),
32            error: None,
33        }
34    }
35
36    pub fn error(code: i32, message: String) -> Self {
37        InternalResponse {
38            data: None,
39            error: Some((code, message)),
40        }
41    }
42}
43
44pub struct ModuleCaller {
45    tx: Arc<tokio::sync::mpsc::Sender<InternalRequest>>,
46    depth: u32,
47    trace_id: u128,
48    span_id: u64,
49}
50
51impl Clone for ModuleCaller {
52    fn clone(&self) -> Self {
53        ModuleCaller {
54            tx: Arc::clone(&self.tx),
55            depth: self.depth,
56            trace_id: self.trace_id,
57            span_id: self.span_id,
58        }
59    }
60}
61
62impl ModuleCaller {
63    pub fn new(
64        tx: tokio::sync::mpsc::Sender<InternalRequest>,
65        depth: u32,
66        trace_id: u128,
67        span_id: u64,
68    ) -> Self {
69        ModuleCaller {
70            tx: Arc::new(tx),
71            depth,
72            trace_id,
73            span_id,
74        }
75    }
76
77    pub fn depth(&self) -> u32 {
78        self.depth
79    }
80
81    pub fn trace_id(&self) -> u128 {
82        self.trace_id
83    }
84
85    pub fn span_id(&self) -> u64 {
86        self.span_id
87    }
88
89    pub async fn call(&self, module: &str, method: &str, args: &[u8]) -> Result<Vec<u8>> {
90        self.call_with_timeout(module, method, args, Duration::from_secs(30)).await
91    }
92
93    pub async fn call_with_timeout(
94        &self,
95        module: &str,
96        method: &str,
97        args: &[u8],
98        timeout: Duration,
99    ) -> Result<Vec<u8>> {
100        if self.depth >= MAX_CALL_DEPTH {
101            return Err(ModuleError::MaxCallDepthExceeded);
102        }
103
104        let deadline = Some(std::time::Instant::now() + timeout);
105
106        let request = InternalRequest {
107            target_module: module.to_string(),
108            target_method: method.to_string(),
109            args: args.to_vec(),
110            caller_trace_id: self.trace_id,
111            deadline,
112            depth: self.depth + 1,
113        };
114
115        let response = self.send_request(request, timeout).await?;
116
117        match response {
118            InternalResponse {
119                data: Some(data),
120                error: None,
121            } => Ok(data),
122            InternalResponse {
123                data: _,
124                error: Some((code, msg)),
125            } => {
126                if code == -404 {
127                    Err(ModuleError::ModuleNotFound(msg))
128                } else if code == -408 {
129                    Err(ModuleError::Timeout(timeout))
130                } else {
131                    Err(ModuleError::CallFailed(msg))
132                }
133            }
134            _ => Err(ModuleError::CallFailed("unexpected response".to_string())),
135        }
136    }
137
138    async fn send_request(
139        &self,
140        request: InternalRequest,
141        timeout: Duration,
142    ) -> Result<InternalResponse> {
143        let tx = self.tx.as_ref();
144
145        let send_future = tx.send(request);
146
147        match tokio::time::timeout(timeout, send_future).await {
148            Ok(Ok(())) => Ok(InternalResponse::success(vec![])),
149            Ok(Err(_)) => Err(ModuleError::ServiceUnavailable),
150            Err(_) => Err(ModuleError::Timeout(timeout)),
151        }
152    }
153
154    pub fn for_nested_call(&self, span_id: u64) -> ModuleCaller {
155        ModuleCaller {
156            tx: Arc::clone(&self.tx),
157            depth: self.depth + 1,
158            trace_id: self.trace_id,
159            span_id,
160        }
161    }
162}
163
164pub fn call_future(
165    caller: &ModuleCaller,
166    module: String,
167    method: String,
168    args: Vec<u8>,
169) -> Pin<Box<dyn Future<Output = Result<Vec<u8>>> + Send + '_>> {
170    Box::pin(async move { caller.call(&module, &method, &args).await })
171}