1use std::collections::{HashMap, HashSet};
14use std::sync::{Arc, OnceLock};
15
16use async_trait::async_trait;
17use crate::chain_client::{ChainBlock, ChainClient};
18use crate::error::TransportError;
19use crate::method_safety::MethodSafety;
20use crate::request::{JsonRpcRequest, JsonRpcResponse};
21use crate::transport::{HealthStatus, RpcTransport};
22
23pub fn classify_cosmos_method(method: &str) -> MethodSafety {
36 if cosmos_unsafe_methods().contains(method) {
37 MethodSafety::Unsafe
38 } else if cosmos_idempotent_methods().contains(method) {
39 MethodSafety::Idempotent
40 } else {
41 MethodSafety::Safe
42 }
43}
44
45pub fn is_cosmos_safe_to_retry(method: &str) -> bool {
47 classify_cosmos_method(method) == MethodSafety::Safe
48}
49
50pub fn is_cosmos_safe_to_dedup(method: &str) -> bool {
52 classify_cosmos_method(method) == MethodSafety::Safe
53}
54
55pub fn is_cosmos_cacheable(method: &str) -> bool {
57 classify_cosmos_method(method) == MethodSafety::Safe
58}
59
60fn cosmos_unsafe_methods() -> &'static HashSet<&'static str> {
61 static UNSAFE: OnceLock<HashSet<&'static str>> = OnceLock::new();
62 UNSAFE.get_or_init(|| {
63 [
64 "broadcast_tx_async",
65 ]
66 .into_iter()
67 .collect()
68 })
69}
70
71fn cosmos_idempotent_methods() -> &'static HashSet<&'static str> {
72 static IDEMPOTENT: OnceLock<HashSet<&'static str>> = OnceLock::new();
73 IDEMPOTENT.get_or_init(|| {
74 [
75 "broadcast_tx_sync",
76 "broadcast_tx_commit",
77 ]
78 .into_iter()
79 .collect()
80 })
81}
82
83#[derive(Debug, Clone)]
89pub struct CosmosCuCostTable {
90 costs: HashMap<String, u32>,
91 default_cost: u32,
92}
93
94impl CosmosCuCostTable {
95 pub fn defaults() -> Self {
97 let mut table = Self::new(15);
98 let entries: &[(&str, u32)] = &[
99 ("status", 5),
100 ("health", 5),
101 ("net_info", 10),
102 ("block", 20),
103 ("block_results", 30),
104 ("blockchain", 25),
105 ("commit", 15),
106 ("validators", 15),
107 ("genesis", 50),
108 ("tx", 15),
109 ("tx_search", 50),
110 ("block_search", 50),
111 ("abci_query", 20),
112 ("broadcast_tx_sync", 10),
113 ("broadcast_tx_async", 10),
114 ("broadcast_tx_commit", 50),
115 ("unconfirmed_txs", 20),
116 ("num_unconfirmed_txs", 5),
117 ("consensus_state", 10),
118 ("dump_consensus_state", 30),
119 ];
120 for &(method, cost) in entries {
121 table.costs.insert(method.to_string(), cost);
122 }
123 table
124 }
125
126 pub fn new(default_cost: u32) -> Self {
128 Self {
129 costs: HashMap::new(),
130 default_cost,
131 }
132 }
133
134 pub fn set_cost(&mut self, method: &str, cost: u32) {
136 self.costs.insert(method.to_string(), cost);
137 }
138
139 pub fn cost_for(&self, method: &str) -> u32 {
141 self.costs.get(method).copied().unwrap_or(self.default_cost)
142 }
143}
144
145impl Default for CosmosCuCostTable {
146 fn default() -> Self {
147 Self::defaults()
148 }
149}
150
151pub fn cosmos_mainnet_endpoints() -> &'static [&'static str] {
157 &[
158 "https://rpc.cosmos.network:26657",
159 "https://cosmos-rpc.polkachu.com",
160 "https://rpc-cosmoshub.blockapsis.com",
161 ]
162}
163
164pub fn cosmos_testnet_endpoints() -> &'static [&'static str] {
166 &[
167 "https://rpc.sentry-01.theta-testnet.polypore.xyz",
168 "https://rpc.state-sync-01.theta-testnet.polypore.xyz",
169 ]
170}
171
172pub fn osmosis_mainnet_endpoints() -> &'static [&'static str] {
174 &[
175 "https://rpc.osmosis.zone",
176 "https://osmosis-rpc.polkachu.com",
177 ]
178}
179
180pub struct CosmosTransport {
189 inner: Arc<dyn RpcTransport>,
190}
191
192impl CosmosTransport {
193 pub fn new(inner: Arc<dyn RpcTransport>) -> Self {
195 Self { inner }
196 }
197
198 pub fn inner(&self) -> &Arc<dyn RpcTransport> {
200 &self.inner
201 }
202}
203
204#[async_trait]
205impl RpcTransport for CosmosTransport {
206 async fn send(&self, req: JsonRpcRequest) -> Result<JsonRpcResponse, TransportError> {
207 self.inner.send(req).await
208 }
209
210 async fn send_batch(
211 &self,
212 reqs: Vec<JsonRpcRequest>,
213 ) -> Result<Vec<JsonRpcResponse>, TransportError> {
214 self.inner.send_batch(reqs).await
215 }
216
217 fn health(&self) -> HealthStatus {
218 self.inner.health()
219 }
220
221 fn url(&self) -> &str {
222 self.inner.url()
223 }
224}
225
226pub struct CosmosChainClient {
237 transport: Arc<dyn RpcTransport>,
238 chain_id: String,
239}
240
241impl CosmosChainClient {
242 pub fn new(transport: Arc<dyn RpcTransport>, chain_id: impl Into<String>) -> Self {
244 Self {
245 transport,
246 chain_id: chain_id.into(),
247 }
248 }
249}
250
251#[async_trait]
252impl ChainClient for CosmosChainClient {
253 async fn get_head_height(&self) -> Result<u64, TransportError> {
254 let req = JsonRpcRequest::new(1, "status", vec![]);
255 let resp = self.transport.send(req).await?;
256 let result = resp.into_result().map_err(TransportError::Rpc)?;
257
258 let height_str = result["result"]["sync_info"]["latest_block_height"]
259 .as_str()
260 .or_else(|| result["sync_info"]["latest_block_height"].as_str())
261 .unwrap_or("0");
262 height_str.parse::<u64>().map_err(|e| {
263 TransportError::Other(format!("invalid cosmos block height: {e}"))
264 })
265 }
266
267 async fn get_block_by_height(
268 &self,
269 height: u64,
270 ) -> Result<Option<ChainBlock>, TransportError> {
271 let req = JsonRpcRequest::new(
272 1,
273 "block",
274 vec![serde_json::json!({ "height": height.to_string() })],
275 );
276 let resp = self.transport.send(req).await?;
277 let result = resp.into_result().map_err(TransportError::Rpc)?;
278
279 let block_data = if result["result"]["block"].is_object() {
281 &result["result"]["block"]
282 } else if result["block"].is_object() {
283 &result["block"]
284 } else {
285 return Ok(None);
286 };
287
288 let header = &block_data["header"];
289 let hash = result["result"]["block_id"]["hash"]
290 .as_str()
291 .or_else(|| result["block_id"]["hash"].as_str())
292 .unwrap_or_default()
293 .to_string();
294 let parent_hash = header["last_block_id"]["hash"]
295 .as_str()
296 .unwrap_or_default()
297 .to_string();
298
299 let time_str = header["time"].as_str().unwrap_or("");
301 let timestamp = parse_rfc3339_to_unix(time_str);
302
303 let tx_count = block_data["data"]["txs"]
304 .as_array()
305 .map(|a| a.len() as u32)
306 .unwrap_or(0);
307
308 Ok(Some(ChainBlock {
309 height,
310 hash,
311 parent_hash,
312 timestamp,
313 tx_count,
314 }))
315 }
316
317 fn chain_id(&self) -> &str {
318 &self.chain_id
319 }
320
321 fn chain_family(&self) -> &str {
322 "cosmos"
323 }
324
325 async fn health_check(&self) -> Result<bool, TransportError> {
326 let req = JsonRpcRequest::new(1, "health", vec![]);
327 let resp = self.transport.send(req).await?;
328 let _result = resp.into_result().map_err(TransportError::Rpc)?;
330 Ok(true)
331 }
332}
333
334fn parse_rfc3339_to_unix(time_str: &str) -> i64 {
340 if time_str.len() < 19 {
343 return 0;
344 }
345 let parts: Vec<&str> = time_str.split('T').collect();
346 if parts.len() != 2 {
347 return 0;
348 }
349 let date_parts: Vec<u32> = parts[0]
350 .split('-')
351 .filter_map(|s| s.parse().ok())
352 .collect();
353 let time_part = parts[1].split('.').next().unwrap_or("").split('Z').next().unwrap_or("");
354 let time_parts: Vec<u32> = time_part
355 .split(':')
356 .filter_map(|s| s.parse().ok())
357 .collect();
358
359 if date_parts.len() != 3 || time_parts.len() != 3 {
360 return 0;
361 }
362
363 let (year, month, day) = (date_parts[0], date_parts[1], date_parts[2]);
364 let (hour, minute, second) = (time_parts[0], time_parts[1], time_parts[2]);
365
366 let mut days: i64 = 0;
368 for y in 1970..year {
369 days += if is_leap_year(y) { 366 } else { 365 };
370 }
371 let month_days = [0, 31, 28, 31, 30, 31, 30, 31, 31, 30, 31, 30, 31];
372 for m in 1..month {
373 days += month_days[m as usize] as i64;
374 if m == 2 && is_leap_year(year) {
375 days += 1;
376 }
377 }
378 days += (day - 1) as i64;
379
380 days * 86400 + hour as i64 * 3600 + minute as i64 * 60 + second as i64
381}
382
383fn is_leap_year(y: u32) -> bool {
384 (y.is_multiple_of(4) && !y.is_multiple_of(100)) || y.is_multiple_of(400)
385}
386
387#[cfg(test)]
392mod tests {
393 use super::*;
394 use crate::request::RpcId;
395 use serde_json::Value;
396 use std::sync::Mutex;
397
398 struct MockTransport {
399 url: String,
400 responses: Mutex<Vec<JsonRpcResponse>>,
401 recorded: Mutex<Vec<String>>,
402 }
403
404 impl MockTransport {
405 fn new(responses: Vec<JsonRpcResponse>) -> Self {
406 Self {
407 url: "mock://cosmos".to_string(),
408 responses: Mutex::new(responses),
409 recorded: Mutex::new(Vec::new()),
410 }
411 }
412 }
413
414 #[async_trait]
415 impl RpcTransport for MockTransport {
416 async fn send(&self, req: JsonRpcRequest) -> Result<JsonRpcResponse, TransportError> {
417 self.recorded.lock().unwrap().push(req.method.clone());
418 let mut responses = self.responses.lock().unwrap();
419 if responses.is_empty() {
420 Err(TransportError::Other("no more mock responses".into()))
421 } else {
422 Ok(responses.remove(0))
423 }
424 }
425
426 fn url(&self) -> &str {
427 &self.url
428 }
429 }
430
431 fn ok_response(result: Value) -> JsonRpcResponse {
432 JsonRpcResponse {
433 jsonrpc: "2.0".to_string(),
434 id: RpcId::Number(1),
435 result: Some(result),
436 error: None,
437 }
438 }
439
440 #[test]
443 fn classify_safe_methods() {
444 assert_eq!(classify_cosmos_method("block"), MethodSafety::Safe);
445 assert_eq!(classify_cosmos_method("block_results"), MethodSafety::Safe);
446 assert_eq!(classify_cosmos_method("validators"), MethodSafety::Safe);
447 assert_eq!(classify_cosmos_method("status"), MethodSafety::Safe);
448 assert_eq!(classify_cosmos_method("tx_search"), MethodSafety::Safe);
449 assert_eq!(classify_cosmos_method("abci_query"), MethodSafety::Safe);
450 }
451
452 #[test]
453 fn classify_idempotent_methods() {
454 assert_eq!(
455 classify_cosmos_method("broadcast_tx_sync"),
456 MethodSafety::Idempotent
457 );
458 assert_eq!(
459 classify_cosmos_method("broadcast_tx_commit"),
460 MethodSafety::Idempotent
461 );
462 }
463
464 #[test]
465 fn classify_unsafe_methods() {
466 assert_eq!(
467 classify_cosmos_method("broadcast_tx_async"),
468 MethodSafety::Unsafe
469 );
470 }
471
472 #[test]
473 fn unknown_method_defaults_safe() {
474 assert_eq!(
475 classify_cosmos_method("some_future_method"),
476 MethodSafety::Safe
477 );
478 }
479
480 #[test]
483 fn cu_cost_defaults() {
484 let table = CosmosCuCostTable::defaults();
485 assert_eq!(table.cost_for("status"), 5);
486 assert_eq!(table.cost_for("block"), 20);
487 assert_eq!(table.cost_for("tx_search"), 50);
488 assert_eq!(table.cost_for("unknown_method"), 15); }
490
491 #[test]
492 fn cu_cost_custom() {
493 let mut table = CosmosCuCostTable::new(10);
494 table.set_cost("block", 100);
495 assert_eq!(table.cost_for("block"), 100);
496 assert_eq!(table.cost_for("status"), 10);
497 }
498
499 #[test]
502 fn retry_dedup_cache_helpers() {
503 assert!(is_cosmos_safe_to_retry("block"));
504 assert!(!is_cosmos_safe_to_retry("broadcast_tx_async"));
505 assert!(is_cosmos_safe_to_dedup("status"));
506 assert!(!is_cosmos_safe_to_dedup("broadcast_tx_sync"));
507 assert!(is_cosmos_cacheable("tx_search"));
508 assert!(!is_cosmos_cacheable("broadcast_tx_commit"));
509 }
510
511 #[test]
514 fn endpoints_not_empty() {
515 assert!(!cosmos_mainnet_endpoints().is_empty());
516 assert!(!cosmos_testnet_endpoints().is_empty());
517 assert!(!osmosis_mainnet_endpoints().is_empty());
518 }
519
520 #[test]
523 fn parse_rfc3339() {
524 let ts = parse_rfc3339_to_unix("2024-01-01T00:00:00Z");
526 assert_eq!(ts, 1704067200);
527 }
528
529 #[test]
530 fn parse_rfc3339_with_nanos() {
531 let ts = parse_rfc3339_to_unix("2024-01-01T00:00:00.123456789Z");
532 assert_eq!(ts, 1704067200);
533 }
534
535 #[test]
536 fn parse_rfc3339_invalid() {
537 assert_eq!(parse_rfc3339_to_unix("invalid"), 0);
538 assert_eq!(parse_rfc3339_to_unix(""), 0);
539 }
540
541 #[tokio::test]
544 async fn cosmos_get_head_height() {
545 let transport = Arc::new(MockTransport::new(vec![ok_response(serde_json::json!({
546 "result": {
547 "sync_info": {
548 "latest_block_height": "19500000"
549 }
550 }
551 }))]));
552 let client = CosmosChainClient::new(transport, "cosmoshub-4");
553 let height = client.get_head_height().await.unwrap();
554 assert_eq!(height, 19500000);
555 }
556
557 #[tokio::test]
558 async fn cosmos_get_block() {
559 let transport = Arc::new(MockTransport::new(vec![ok_response(serde_json::json!({
560 "result": {
561 "block_id": {
562 "hash": "ABC123DEF"
563 },
564 "block": {
565 "header": {
566 "height": "100",
567 "time": "2024-01-01T00:00:00Z",
568 "last_block_id": {
569 "hash": "PARENT_HASH"
570 }
571 },
572 "data": {
573 "txs": ["tx1", "tx2"]
574 }
575 }
576 }
577 }))]));
578 let client = CosmosChainClient::new(transport, "cosmoshub-4");
579 let block = client.get_block_by_height(100).await.unwrap().unwrap();
580 assert_eq!(block.height, 100);
581 assert_eq!(block.hash, "ABC123DEF");
582 assert_eq!(block.parent_hash, "PARENT_HASH");
583 assert_eq!(block.tx_count, 2);
584 assert_eq!(block.timestamp, 1704067200);
585 }
586
587 #[tokio::test]
588 async fn cosmos_health_check() {
589 let transport = Arc::new(MockTransport::new(vec![ok_response(
590 serde_json::json!({}),
591 )]));
592 let client = CosmosChainClient::new(transport, "cosmoshub-4");
593 assert!(client.health_check().await.unwrap());
594 }
595
596 #[tokio::test]
597 async fn cosmos_chain_metadata() {
598 let transport = Arc::new(MockTransport::new(vec![]));
599 let client = CosmosChainClient::new(transport, "osmosis-1");
600 assert_eq!(client.chain_id(), "osmosis-1");
601 assert_eq!(client.chain_family(), "cosmos");
602 }
603
604 #[tokio::test]
607 async fn cosmos_transport_delegates() {
608 let inner = Arc::new(MockTransport::new(vec![ok_response(
609 serde_json::json!("ok"),
610 )]));
611 let transport = CosmosTransport::new(inner.clone());
612 assert_eq!(transport.url(), "mock://cosmos");
613
614 let req = JsonRpcRequest::new(1, "health", vec![]);
615 let resp = transport.send(req).await.unwrap();
616 assert!(resp.is_ok());
617 assert_eq!(inner.recorded.lock().unwrap().len(), 1);
618 }
619}