Skip to main content

percli_core/agent/
feed.rs

1use serde::Deserialize;
2use std::io::BufRead;
3use std::path::PathBuf;
4
5/// A single price observation in the feed.
6#[derive(Debug, Clone, Deserialize)]
7pub struct PriceTick {
8    pub oracle: u64,
9    pub slot: u64,
10}
11
12/// Source of price data for the agent tick loop.
13#[derive(Debug, Clone, Deserialize)]
14#[serde(tag = "type", rename_all = "snake_case")]
15pub enum FeedConfig {
16    Inline {
17        prices: Vec<PriceTick>,
18    },
19    Csv {
20        path: PathBuf,
21    },
22    Stdin,
23    Pyth {
24        rpc_url: String,
25        feed_id: String,
26        #[serde(default = "default_poll_ms")]
27        poll_ms: u64,
28        #[serde(default = "default_max_ticks")]
29        max_ticks: u64,
30    },
31}
32
33fn default_poll_ms() -> u64 {
34    2000
35}
36
37fn default_max_ticks() -> u64 {
38    1000
39}
40
41impl Default for FeedConfig {
42    fn default() -> Self {
43        Self::Inline {
44            prices: vec![PriceTick {
45                oracle: 1000,
46                slot: 100,
47            }],
48        }
49    }
50}
51
52impl FeedConfig {
53    /// Convert the feed config into a boxed iterator of price ticks.
54    ///
55    /// For CSV and stdin feeds, parse errors are silently skipped.
56    pub fn into_tick_iter(self) -> anyhow::Result<Box<dyn Iterator<Item = PriceTick>>> {
57        match self {
58            FeedConfig::Inline { prices } => Ok(Box::new(prices.into_iter())),
59            FeedConfig::Csv { path } => {
60                let content = std::fs::read_to_string(&path).map_err(|e| {
61                    anyhow::anyhow!("failed to read feed CSV {}: {}", path.display(), e)
62                })?;
63                let ticks: Vec<PriceTick> = content
64                    .lines()
65                    .filter(|line| !line.starts_with('#') && !line.is_empty())
66                    .filter_map(|line| {
67                        let parts: Vec<&str> = line.split(',').map(|s| s.trim()).collect();
68                        if parts.len() >= 2 {
69                            let oracle = parts[0].parse().ok()?;
70                            let slot = parts[1].parse().ok()?;
71                            Some(PriceTick { oracle, slot })
72                        } else {
73                            None
74                        }
75                    })
76                    .collect();
77                Ok(Box::new(ticks.into_iter()))
78            }
79            FeedConfig::Stdin => {
80                let reader = std::io::stdin().lock();
81                let ticks: Vec<PriceTick> = reader
82                    .lines()
83                    .map_while(Result::ok)
84                    .filter(|line| !line.trim().is_empty())
85                    .filter_map(|line| serde_json::from_str(&line).ok())
86                    .collect();
87                Ok(Box::new(ticks.into_iter()))
88            }
89            #[cfg(feature = "pyth")]
90            FeedConfig::Pyth {
91                rpc_url,
92                feed_id,
93                poll_ms,
94                max_ticks,
95            } => {
96                use pyth_sdk_solana::load_price_feed_from_account_info;
97                use solana_client::rpc_client::RpcClient;
98                use solana_sdk::commitment_config::CommitmentConfig;
99
100                let client = RpcClient::new_with_commitment(rpc_url, CommitmentConfig::confirmed());
101                let feed_pubkey: solana_sdk::pubkey::Pubkey = feed_id
102                    .parse()
103                    .map_err(|_| anyhow::anyhow!("invalid Pyth feed pubkey: {}", feed_id))?;
104
105                let mut ticks = Vec::new();
106                let mut last_slot = 0u64;
107
108                for _ in 0..max_ticks {
109                    match client.get_account(&feed_pubkey) {
110                        Ok(account) => {
111                            let mut data = account.data.as_slice();
112                            if let Ok(feed) = load_price_feed_from_account_info(&mut data) {
113                                if let Some(price) = feed.get_price_no_older_than(60) {
114                                    let slot = client.get_slot().unwrap_or(last_slot + 1);
115                                    // Convert Pyth price (with exponent) to u64
116                                    let oracle = if price.expo >= 0 {
117                                        (price.price as u64)
118                                            .saturating_mul(10u64.saturating_pow(price.expo as u32))
119                                    } else {
120                                        (price.price as u64)
121                                            / 10u64.saturating_pow((-price.expo) as u32)
122                                    };
123                                    if slot > last_slot {
124                                        ticks.push(PriceTick { oracle, slot });
125                                        last_slot = slot;
126                                    }
127                                }
128                            }
129                        }
130                        Err(e) => {
131                            eprintln!("pyth feed error: {e}");
132                        }
133                    }
134                    std::thread::sleep(std::time::Duration::from_millis(poll_ms));
135                }
136                Ok(Box::new(ticks.into_iter()))
137            }
138            #[cfg(not(feature = "pyth"))]
139            FeedConfig::Pyth { .. } => {
140                anyhow::bail!(
141                    "Pyth feed requires the `pyth` feature. \
142                     Install with: cargo install percli --features pyth"
143                )
144            }
145        }
146    }
147}