1use std::future::Future;
4use std::pin::Pin;
5use std::sync::Arc;
6use std::time::Duration;
7
8use crate::error::{ModuleError, Result};
9
10pub const MAX_CALL_DEPTH: u32 = 5;
11
12#[derive(Debug, Clone)]
13pub struct InternalRequest {
14 pub target_module: String,
15 pub target_method: String,
16 pub args: Vec<u8>,
17 pub caller_trace_id: u128,
18 pub deadline: Option<std::time::Instant>,
19 pub depth: u32,
20}
21
22#[derive(Debug, Clone)]
23pub struct InternalResponse {
24 pub data: Option<Vec<u8>>,
25 pub error: Option<(i32, String)>,
26}
27
28impl InternalResponse {
29 pub fn success(data: Vec<u8>) -> Self {
30 InternalResponse {
31 data: Some(data),
32 error: None,
33 }
34 }
35
36 pub fn error(code: i32, message: String) -> Self {
37 InternalResponse {
38 data: None,
39 error: Some((code, message)),
40 }
41 }
42}
43
44pub struct ModuleCaller {
45 tx: Arc<tokio::sync::mpsc::Sender<InternalRequest>>,
46 depth: u32,
47 trace_id: u128,
48 span_id: u64,
49}
50
51impl Clone for ModuleCaller {
52 fn clone(&self) -> Self {
53 ModuleCaller {
54 tx: Arc::clone(&self.tx),
55 depth: self.depth,
56 trace_id: self.trace_id,
57 span_id: self.span_id,
58 }
59 }
60}
61
62impl ModuleCaller {
63 pub fn new(
64 tx: tokio::sync::mpsc::Sender<InternalRequest>,
65 depth: u32,
66 trace_id: u128,
67 span_id: u64,
68 ) -> Self {
69 ModuleCaller {
70 tx: Arc::new(tx),
71 depth,
72 trace_id,
73 span_id,
74 }
75 }
76
77 pub fn depth(&self) -> u32 {
78 self.depth
79 }
80
81 pub fn trace_id(&self) -> u128 {
82 self.trace_id
83 }
84
85 pub fn span_id(&self) -> u64 {
86 self.span_id
87 }
88
89 pub async fn call(&self, module: &str, method: &str, args: &[u8]) -> Result<Vec<u8>> {
90 self.call_with_timeout(module, method, args, Duration::from_secs(30)).await
91 }
92
93 pub async fn call_with_timeout(
94 &self,
95 module: &str,
96 method: &str,
97 args: &[u8],
98 timeout: Duration,
99 ) -> Result<Vec<u8>> {
100 if self.depth >= MAX_CALL_DEPTH {
101 return Err(ModuleError::MaxCallDepthExceeded);
102 }
103
104 let deadline = Some(std::time::Instant::now() + timeout);
105
106 let request = InternalRequest {
107 target_module: module.to_string(),
108 target_method: method.to_string(),
109 args: args.to_vec(),
110 caller_trace_id: self.trace_id,
111 deadline,
112 depth: self.depth + 1,
113 };
114
115 let response = self.send_request(request, timeout).await?;
116
117 match response {
118 InternalResponse {
119 data: Some(data),
120 error: None,
121 } => Ok(data),
122 InternalResponse {
123 data: _,
124 error: Some((code, msg)),
125 } => {
126 if code == -404 {
127 Err(ModuleError::ModuleNotFound(msg))
128 } else if code == -408 {
129 Err(ModuleError::Timeout(timeout))
130 } else {
131 Err(ModuleError::CallFailed(msg))
132 }
133 }
134 _ => Err(ModuleError::CallFailed("unexpected response".to_string())),
135 }
136 }
137
138 async fn send_request(
139 &self,
140 request: InternalRequest,
141 timeout: Duration,
142 ) -> Result<InternalResponse> {
143 let tx = self.tx.as_ref();
144
145 let send_future = tx.send(request);
146
147 match tokio::time::timeout(timeout, send_future).await {
148 Ok(Ok(())) => Ok(InternalResponse::success(vec![])),
149 Ok(Err(_)) => Err(ModuleError::ServiceUnavailable),
150 Err(_) => Err(ModuleError::Timeout(timeout)),
151 }
152 }
153
154 pub fn for_nested_call(&self, span_id: u64) -> ModuleCaller {
155 ModuleCaller {
156 tx: Arc::clone(&self.tx),
157 depth: self.depth + 1,
158 trace_id: self.trace_id,
159 span_id,
160 }
161 }
162}
163
164pub fn call_future(
165 caller: &ModuleCaller,
166 module: String,
167 method: String,
168 args: Vec<u8>,
169) -> Pin<Box<dyn Future<Output = Result<Vec<u8>>> + Send + '_>> {
170 Box::pin(async move { caller.call(&module, &method, &args).await })
171}