apex_sdk/
advanced.rs

1//! Advanced features for Apex SDK
2//!
3//! This module provides advanced blockchain interaction features including:
4//! - Event subscriptions and monitoring
5//! - Parallel transaction execution
6//! - Transaction batching
7//! - Block monitoring
8
9use crate::error::{Error, Result};
10use apex_sdk_types::{Event, EventFilter};
11use std::sync::Arc;
12use tokio::sync::{broadcast, RwLock};
13
14/// Event subscription manager
15#[derive(Clone)]
16pub struct EventSubscription {
17    filter: EventFilter,
18    sender: broadcast::Sender<Event>,
19    active: Arc<RwLock<bool>>,
20}
21
22impl EventSubscription {
23    /// Create a new event subscription
24    pub fn new(filter: EventFilter) -> Self {
25        let (sender, _) = broadcast::channel(1000);
26        Self {
27            filter,
28            sender,
29            active: Arc::new(RwLock::new(true)),
30        }
31    }
32
33    /// Subscribe to events
34    pub fn subscribe(&self) -> broadcast::Receiver<Event> {
35        self.sender.subscribe()
36    }
37
38    /// Emit an event
39    pub async fn emit(&self, event: Event) -> Result<()> {
40        if !*self.active.read().await {
41            return Err(Error::Other("Subscription is not active".to_string()));
42        }
43
44        if self.matches_filter(&event) {
45            self.sender
46                .send(event)
47                .map_err(|e| Error::Other(format!("Failed to send event: {}", e)))?;
48        }
49
50        Ok(())
51    }
52
53    /// Check if event matches filter
54    fn matches_filter(&self, event: &Event) -> bool {
55        // Filter by event names
56        if let Some(ref names) = self.filter.event_names {
57            if !names.contains(&event.name) {
58                return false;
59            }
60        }
61
62        // Filter by block range
63        if let Some(block_number) = event.block_number {
64            if let Some(from_block) = self.filter.from_block {
65                if block_number < from_block {
66                    return false;
67                }
68            }
69            if let Some(to_block) = self.filter.to_block {
70                if block_number > to_block {
71                    return false;
72                }
73            }
74        }
75
76        true
77    }
78
79    /// Stop the subscription
80    pub async fn stop(&self) {
81        *self.active.write().await = false;
82    }
83
84    /// Check if subscription is active
85    pub async fn is_active(&self) -> bool {
86        *self.active.read().await
87    }
88}
89
90/// Block subscription for monitoring new blocks
91#[derive(Clone)]
92pub struct BlockSubscription {
93    sender: broadcast::Sender<BlockInfo>,
94    active: Arc<RwLock<bool>>,
95}
96
97/// Block information
98#[derive(Debug, Clone)]
99pub struct BlockInfo {
100    /// Block number
101    pub number: u64,
102    /// Block hash
103    pub hash: String,
104    /// Parent hash
105    pub parent_hash: String,
106    /// Timestamp
107    pub timestamp: u64,
108    /// Number of transactions
109    pub tx_count: u32,
110}
111
112impl BlockSubscription {
113    /// Create a new block subscription
114    pub fn new() -> Self {
115        let (sender, _) = broadcast::channel(100);
116        Self {
117            sender,
118            active: Arc::new(RwLock::new(true)),
119        }
120    }
121
122    /// Subscribe to new blocks
123    pub fn subscribe(&self) -> broadcast::Receiver<BlockInfo> {
124        self.sender.subscribe()
125    }
126
127    /// Emit a new block
128    pub async fn emit(&self, block: BlockInfo) -> Result<()> {
129        if !*self.active.read().await {
130            return Err(Error::Other("Subscription is not active".to_string()));
131        }
132
133        self.sender
134            .send(block)
135            .map_err(|e| Error::Other(format!("Failed to send block: {}", e)))?;
136
137        Ok(())
138    }
139
140    /// Stop the subscription
141    pub async fn stop(&self) {
142        *self.active.write().await = false;
143    }
144
145    /// Check if subscription is active
146    pub async fn is_active(&self) -> bool {
147        *self.active.read().await
148    }
149}
150
151impl Default for BlockSubscription {
152    fn default() -> Self {
153        Self::new()
154    }
155}
156
157/// Parallel transaction executor
158pub struct ParallelExecutor {
159    max_concurrent: usize,
160}
161
162impl ParallelExecutor {
163    /// Create a new parallel executor
164    pub fn new(max_concurrent: usize) -> Self {
165        Self { max_concurrent }
166    }
167
168    /// Execute transactions in parallel
169    pub async fn execute<F, Fut, T>(&self, transactions: Vec<F>) -> Vec<Result<T>>
170    where
171        F: FnOnce() -> Fut + Send + 'static,
172        Fut: std::future::Future<Output = Result<T>> + Send + 'static,
173        T: Send + 'static,
174    {
175        let semaphore = Arc::new(tokio::sync::Semaphore::new(self.max_concurrent));
176        let mut handles = vec![];
177
178        for tx in transactions {
179            let permit = semaphore.clone();
180            let handle = tokio::spawn(async move {
181                let _permit = permit.acquire().await.unwrap();
182                tx().await
183            });
184            handles.push(handle);
185        }
186
187        let mut results = vec![];
188        for handle in handles {
189            match handle.await {
190                Ok(result) => results.push(result),
191                Err(e) => results.push(Err(Error::Other(format!("Task failed: {}", e)))),
192            }
193        }
194
195        results
196    }
197
198    /// Execute transactions in parallel with timeout
199    pub async fn execute_with_timeout<F, Fut, T>(
200        &self,
201        transactions: Vec<F>,
202        timeout: std::time::Duration,
203    ) -> Vec<Result<T>>
204    where
205        F: FnOnce() -> Fut + Send + 'static,
206        Fut: std::future::Future<Output = Result<T>> + Send + 'static,
207        T: Send + 'static,
208    {
209        let semaphore = Arc::new(tokio::sync::Semaphore::new(self.max_concurrent));
210        let mut handles = vec![];
211
212        for tx in transactions {
213            let permit = semaphore.clone();
214            let handle = tokio::spawn(async move {
215                let _permit = permit.acquire().await.unwrap();
216                match tokio::time::timeout(timeout, tx()).await {
217                    Ok(result) => result,
218                    Err(_) => Err(Error::transaction("Transaction timeout")),
219                }
220            });
221            handles.push(handle);
222        }
223
224        let mut results = vec![];
225        for handle in handles {
226            match handle.await {
227                Ok(result) => results.push(result),
228                Err(e) => results.push(Err(Error::Other(format!("Task failed: {}", e)))),
229            }
230        }
231
232        results
233    }
234}
235
236impl Default for ParallelExecutor {
237    fn default() -> Self {
238        Self::new(10)
239    }
240}
241
242/// Transaction batch builder
243#[derive(Default)]
244pub struct TransactionBatch {
245    transactions: Vec<Vec<u8>>,
246}
247
248impl TransactionBatch {
249    /// Create a new transaction batch
250    pub fn new() -> Self {
251        Self::default()
252    }
253
254    /// Add a transaction to the batch
255    pub fn add(&mut self, tx: Vec<u8>) -> &mut Self {
256        self.transactions.push(tx);
257        self
258    }
259
260    /// Add multiple transactions
261    pub fn add_many(&mut self, txs: Vec<Vec<u8>>) -> &mut Self {
262        self.transactions.extend(txs);
263        self
264    }
265
266    /// Get the number of transactions
267    pub fn len(&self) -> usize {
268        self.transactions.len()
269    }
270
271    /// Check if batch is empty
272    pub fn is_empty(&self) -> bool {
273        self.transactions.is_empty()
274    }
275
276    /// Clear the batch
277    pub fn clear(&mut self) {
278        self.transactions.clear();
279    }
280
281    /// Get all transactions
282    pub fn transactions(&self) -> &[Vec<u8>] {
283        &self.transactions
284    }
285
286    /// Take all transactions and clear the batch
287    pub fn take(&mut self) -> Vec<Vec<u8>> {
288        std::mem::take(&mut self.transactions)
289    }
290}
291
292#[cfg(test)]
293mod tests {
294    use super::*;
295    use std::time::Duration;
296
297    #[tokio::test]
298    async fn test_event_subscription() {
299        let filter = EventFilter {
300            event_names: Some(vec!["Transfer".to_string()]),
301            addresses: None,
302            from_block: None,
303            to_block: None,
304        };
305
306        let subscription = EventSubscription::new(filter);
307        let mut receiver = subscription.subscribe();
308
309        let event = Event {
310            name: "Transfer".to_string(),
311            data: serde_json::json!({"from": "0x123", "to": "0x456", "amount": 100}),
312            block_number: Some(100),
313            tx_hash: Some("0xabc".to_string()),
314            index: Some(0),
315        };
316
317        subscription.emit(event.clone()).await.unwrap();
318
319        let received = receiver.recv().await.unwrap();
320        assert_eq!(received.name, "Transfer");
321        assert_eq!(received.block_number, Some(100));
322    }
323
324    #[tokio::test]
325    async fn test_block_subscription() {
326        let subscription = BlockSubscription::new();
327        let mut receiver = subscription.subscribe();
328
329        let block = BlockInfo {
330            number: 100,
331            hash: "0xabc".to_string(),
332            parent_hash: "0x123".to_string(),
333            timestamp: 1234567890,
334            tx_count: 10,
335        };
336
337        subscription.emit(block.clone()).await.unwrap();
338
339        let received = receiver.recv().await.unwrap();
340        assert_eq!(received.number, 100);
341        assert_eq!(received.hash, "0xabc");
342    }
343
344    #[tokio::test]
345    async fn test_parallel_executor() {
346        let executor = ParallelExecutor::new(5);
347
348        let transactions: Vec<_> = (0..10)
349            .map(|i| {
350                move || async move {
351                    tokio::time::sleep(Duration::from_millis(10)).await;
352                    Ok::<_, Error>(i * 2)
353                }
354            })
355            .collect();
356
357        let results = executor.execute(transactions).await;
358
359        assert_eq!(results.len(), 10);
360        for (i, result) in results.iter().enumerate() {
361            assert_eq!(result.as_ref().unwrap(), &(i * 2));
362        }
363    }
364
365    #[tokio::test]
366    async fn test_parallel_executor_with_timeout() {
367        let executor = ParallelExecutor::new(5);
368
369        let transactions: Vec<_> = (0..5)
370            .map(|i| {
371                move || async move {
372                    if i == 2 {
373                        tokio::time::sleep(Duration::from_secs(2)).await;
374                    } else {
375                        tokio::time::sleep(Duration::from_millis(10)).await;
376                    }
377                    Ok::<_, Error>(i)
378                }
379            })
380            .collect();
381
382        let results = executor
383            .execute_with_timeout(transactions, Duration::from_millis(100))
384            .await;
385
386        assert_eq!(results.len(), 5);
387        assert!(results[2].is_err()); // Should timeout
388    }
389
390    #[test]
391    fn test_transaction_batch() {
392        let mut batch = TransactionBatch::new();
393
394        batch.add(vec![1, 2, 3]);
395        batch.add(vec![4, 5, 6]);
396
397        assert_eq!(batch.len(), 2);
398        assert!(!batch.is_empty());
399
400        let txs = batch.take();
401        assert_eq!(txs.len(), 2);
402        assert!(batch.is_empty());
403    }
404
405    #[tokio::test]
406    async fn test_subscription_stop() {
407        let subscription = BlockSubscription::new();
408
409        assert!(subscription.is_active().await);
410
411        subscription.stop().await;
412
413        assert!(!subscription.is_active().await);
414
415        let block = BlockInfo {
416            number: 100,
417            hash: "0xabc".to_string(),
418            parent_hash: "0x123".to_string(),
419            timestamp: 1234567890,
420            tx_count: 10,
421        };
422
423        assert!(subscription.emit(block).await.is_err());
424    }
425}