1use crate::error::{Error, Result};
10use apex_sdk_types::{Event, EventFilter};
11use std::sync::Arc;
12use tokio::sync::{broadcast, RwLock};
13
14#[derive(Clone)]
16pub struct EventSubscription {
17 filter: EventFilter,
18 sender: broadcast::Sender<Event>,
19 active: Arc<RwLock<bool>>,
20}
21
22impl EventSubscription {
23 pub fn new(filter: EventFilter) -> Self {
25 let (sender, _) = broadcast::channel(1000);
26 Self {
27 filter,
28 sender,
29 active: Arc::new(RwLock::new(true)),
30 }
31 }
32
33 pub fn subscribe(&self) -> broadcast::Receiver<Event> {
35 self.sender.subscribe()
36 }
37
38 pub async fn emit(&self, event: Event) -> Result<()> {
40 if !*self.active.read().await {
41 return Err(Error::Other("Subscription is not active".to_string()));
42 }
43
44 if self.matches_filter(&event) {
45 self.sender
46 .send(event)
47 .map_err(|e| Error::Other(format!("Failed to send event: {}", e)))?;
48 }
49
50 Ok(())
51 }
52
53 fn matches_filter(&self, event: &Event) -> bool {
55 if let Some(ref names) = self.filter.event_names {
57 if !names.contains(&event.name) {
58 return false;
59 }
60 }
61
62 if let Some(block_number) = event.block_number {
64 if let Some(from_block) = self.filter.from_block {
65 if block_number < from_block {
66 return false;
67 }
68 }
69 if let Some(to_block) = self.filter.to_block {
70 if block_number > to_block {
71 return false;
72 }
73 }
74 }
75
76 true
77 }
78
79 pub async fn stop(&self) {
81 *self.active.write().await = false;
82 }
83
84 pub async fn is_active(&self) -> bool {
86 *self.active.read().await
87 }
88}
89
90#[derive(Clone)]
92pub struct BlockSubscription {
93 sender: broadcast::Sender<BlockInfo>,
94 active: Arc<RwLock<bool>>,
95}
96
97#[derive(Debug, Clone)]
99pub struct BlockInfo {
100 pub number: u64,
102 pub hash: String,
104 pub parent_hash: String,
106 pub timestamp: u64,
108 pub tx_count: u32,
110}
111
112impl BlockSubscription {
113 pub fn new() -> Self {
115 let (sender, _) = broadcast::channel(100);
116 Self {
117 sender,
118 active: Arc::new(RwLock::new(true)),
119 }
120 }
121
122 pub fn subscribe(&self) -> broadcast::Receiver<BlockInfo> {
124 self.sender.subscribe()
125 }
126
127 pub async fn emit(&self, block: BlockInfo) -> Result<()> {
129 if !*self.active.read().await {
130 return Err(Error::Other("Subscription is not active".to_string()));
131 }
132
133 self.sender
134 .send(block)
135 .map_err(|e| Error::Other(format!("Failed to send block: {}", e)))?;
136
137 Ok(())
138 }
139
140 pub async fn stop(&self) {
142 *self.active.write().await = false;
143 }
144
145 pub async fn is_active(&self) -> bool {
147 *self.active.read().await
148 }
149}
150
151impl Default for BlockSubscription {
152 fn default() -> Self {
153 Self::new()
154 }
155}
156
157pub struct ParallelExecutor {
159 max_concurrent: usize,
160}
161
162impl ParallelExecutor {
163 pub fn new(max_concurrent: usize) -> Self {
165 Self { max_concurrent }
166 }
167
168 pub async fn execute<F, Fut, T>(&self, transactions: Vec<F>) -> Vec<Result<T>>
170 where
171 F: FnOnce() -> Fut + Send + 'static,
172 Fut: std::future::Future<Output = Result<T>> + Send + 'static,
173 T: Send + 'static,
174 {
175 let semaphore = Arc::new(tokio::sync::Semaphore::new(self.max_concurrent));
176 let mut handles = vec![];
177
178 for tx in transactions {
179 let permit = semaphore.clone();
180 let handle = tokio::spawn(async move {
181 let _permit = permit.acquire().await.unwrap();
182 tx().await
183 });
184 handles.push(handle);
185 }
186
187 let mut results = vec![];
188 for handle in handles {
189 match handle.await {
190 Ok(result) => results.push(result),
191 Err(e) => results.push(Err(Error::Other(format!("Task failed: {}", e)))),
192 }
193 }
194
195 results
196 }
197
198 pub async fn execute_with_timeout<F, Fut, T>(
200 &self,
201 transactions: Vec<F>,
202 timeout: std::time::Duration,
203 ) -> Vec<Result<T>>
204 where
205 F: FnOnce() -> Fut + Send + 'static,
206 Fut: std::future::Future<Output = Result<T>> + Send + 'static,
207 T: Send + 'static,
208 {
209 let semaphore = Arc::new(tokio::sync::Semaphore::new(self.max_concurrent));
210 let mut handles = vec![];
211
212 for tx in transactions {
213 let permit = semaphore.clone();
214 let handle = tokio::spawn(async move {
215 let _permit = permit.acquire().await.unwrap();
216 match tokio::time::timeout(timeout, tx()).await {
217 Ok(result) => result,
218 Err(_) => Err(Error::transaction("Transaction timeout")),
219 }
220 });
221 handles.push(handle);
222 }
223
224 let mut results = vec![];
225 for handle in handles {
226 match handle.await {
227 Ok(result) => results.push(result),
228 Err(e) => results.push(Err(Error::Other(format!("Task failed: {}", e)))),
229 }
230 }
231
232 results
233 }
234}
235
236impl Default for ParallelExecutor {
237 fn default() -> Self {
238 Self::new(10)
239 }
240}
241
242#[derive(Default)]
244pub struct TransactionBatch {
245 transactions: Vec<Vec<u8>>,
246}
247
248impl TransactionBatch {
249 pub fn new() -> Self {
251 Self::default()
252 }
253
254 pub fn add(&mut self, tx: Vec<u8>) -> &mut Self {
256 self.transactions.push(tx);
257 self
258 }
259
260 pub fn add_many(&mut self, txs: Vec<Vec<u8>>) -> &mut Self {
262 self.transactions.extend(txs);
263 self
264 }
265
266 pub fn len(&self) -> usize {
268 self.transactions.len()
269 }
270
271 pub fn is_empty(&self) -> bool {
273 self.transactions.is_empty()
274 }
275
276 pub fn clear(&mut self) {
278 self.transactions.clear();
279 }
280
281 pub fn transactions(&self) -> &[Vec<u8>] {
283 &self.transactions
284 }
285
286 pub fn take(&mut self) -> Vec<Vec<u8>> {
288 std::mem::take(&mut self.transactions)
289 }
290}
291
292#[cfg(test)]
293mod tests {
294 use super::*;
295 use std::time::Duration;
296
297 #[tokio::test]
298 async fn test_event_subscription() {
299 let filter = EventFilter {
300 event_names: Some(vec!["Transfer".to_string()]),
301 addresses: None,
302 from_block: None,
303 to_block: None,
304 };
305
306 let subscription = EventSubscription::new(filter);
307 let mut receiver = subscription.subscribe();
308
309 let event = Event {
310 name: "Transfer".to_string(),
311 data: serde_json::json!({"from": "0x123", "to": "0x456", "amount": 100}),
312 block_number: Some(100),
313 tx_hash: Some("0xabc".to_string()),
314 index: Some(0),
315 };
316
317 subscription.emit(event.clone()).await.unwrap();
318
319 let received = receiver.recv().await.unwrap();
320 assert_eq!(received.name, "Transfer");
321 assert_eq!(received.block_number, Some(100));
322 }
323
324 #[tokio::test]
325 async fn test_block_subscription() {
326 let subscription = BlockSubscription::new();
327 let mut receiver = subscription.subscribe();
328
329 let block = BlockInfo {
330 number: 100,
331 hash: "0xabc".to_string(),
332 parent_hash: "0x123".to_string(),
333 timestamp: 1234567890,
334 tx_count: 10,
335 };
336
337 subscription.emit(block.clone()).await.unwrap();
338
339 let received = receiver.recv().await.unwrap();
340 assert_eq!(received.number, 100);
341 assert_eq!(received.hash, "0xabc");
342 }
343
344 #[tokio::test]
345 async fn test_parallel_executor() {
346 let executor = ParallelExecutor::new(5);
347
348 let transactions: Vec<_> = (0..10)
349 .map(|i| {
350 move || async move {
351 tokio::time::sleep(Duration::from_millis(10)).await;
352 Ok::<_, Error>(i * 2)
353 }
354 })
355 .collect();
356
357 let results = executor.execute(transactions).await;
358
359 assert_eq!(results.len(), 10);
360 for (i, result) in results.iter().enumerate() {
361 assert_eq!(result.as_ref().unwrap(), &(i * 2));
362 }
363 }
364
365 #[tokio::test]
366 async fn test_parallel_executor_with_timeout() {
367 let executor = ParallelExecutor::new(5);
368
369 let transactions: Vec<_> = (0..5)
370 .map(|i| {
371 move || async move {
372 if i == 2 {
373 tokio::time::sleep(Duration::from_secs(2)).await;
374 } else {
375 tokio::time::sleep(Duration::from_millis(10)).await;
376 }
377 Ok::<_, Error>(i)
378 }
379 })
380 .collect();
381
382 let results = executor
383 .execute_with_timeout(transactions, Duration::from_millis(100))
384 .await;
385
386 assert_eq!(results.len(), 5);
387 assert!(results[2].is_err()); }
389
390 #[test]
391 fn test_transaction_batch() {
392 let mut batch = TransactionBatch::new();
393
394 batch.add(vec![1, 2, 3]);
395 batch.add(vec![4, 5, 6]);
396
397 assert_eq!(batch.len(), 2);
398 assert!(!batch.is_empty());
399
400 let txs = batch.take();
401 assert_eq!(txs.len(), 2);
402 assert!(batch.is_empty());
403 }
404
405 #[tokio::test]
406 async fn test_subscription_stop() {
407 let subscription = BlockSubscription::new();
408
409 assert!(subscription.is_active().await);
410
411 subscription.stop().await;
412
413 assert!(!subscription.is_active().await);
414
415 let block = BlockInfo {
416 number: 100,
417 hash: "0xabc".to_string(),
418 parent_hash: "0x123".to_string(),
419 timestamp: 1234567890,
420 tx_count: 10,
421 };
422
423 assert!(subscription.emit(block).await.is_err());
424 }
425}