oura/framework/
mod.rs

1//! Internal pipeline framework
2
3use pallas::network::miniprotocols::Point;
4use serde::Deserialize;
5use serde_json::{json, Value as JsonValue};
6use std::collections::VecDeque;
7use std::fmt::Debug;
8use std::path::PathBuf;
9
10pub use crate::cursor::Config as CursorConfig;
11
12// we use UtxoRpc as our canonical representation of a parsed Tx
13pub use pallas::interop::utxorpc::spec::cardano::Block as ParsedBlock;
14pub use pallas::interop::utxorpc::spec::cardano::Tx as ParsedTx;
15
16// we use GenesisValues from Pallas as our ChainConfig
17pub use pallas::ledger::traverse::wellknown::GenesisValues;
18
19pub mod errors;
20pub mod legacy_v1;
21
22pub use errors::*;
23
24#[derive(Clone)]
25pub struct Breadcrumbs {
26    state: VecDeque<Point>,
27    max: usize,
28}
29
30impl Breadcrumbs {
31    pub fn new(max: usize) -> Self {
32        Self {
33            state: Default::default(),
34            max,
35        }
36    }
37
38    pub fn from_points(points: Vec<Point>, max: usize) -> Self {
39        Self {
40            state: VecDeque::from_iter(points),
41            max,
42        }
43    }
44
45    pub fn is_empty(&self) -> bool {
46        self.state.is_empty()
47    }
48
49    pub fn track(&mut self, point: Point) {
50        // if we have a rollback, retain only older points
51        self.state
52            .retain(|p| p.slot_or_default() < point.slot_or_default());
53
54        // add the new point we're tracking
55        self.state.push_front(point);
56
57        // if we have too many points, remove the older ones
58        if self.state.len() > self.max {
59            self.state.pop_back();
60        }
61    }
62
63    pub fn points(&self) -> Vec<Point> {
64        self.state.iter().map(Clone::clone).collect()
65    }
66}
67
68#[derive(Deserialize, Clone)]
69#[serde(tag = "type", rename_all = "lowercase")]
70pub enum ChainConfig {
71    Mainnet,
72    Testnet,
73    PreProd,
74    Preview,
75    Custom(GenesisValues),
76}
77
78impl Default for ChainConfig {
79    fn default() -> Self {
80        Self::Mainnet
81    }
82}
83
84impl From<ChainConfig> for GenesisValues {
85    fn from(other: ChainConfig) -> Self {
86        match other {
87            ChainConfig::Mainnet => GenesisValues::mainnet(),
88            ChainConfig::Testnet => GenesisValues::testnet(),
89            ChainConfig::PreProd => GenesisValues::preprod(),
90            ChainConfig::Preview => GenesisValues::preview(),
91            ChainConfig::Custom(x) => x,
92        }
93    }
94}
95
96pub struct Context {
97    pub chain: ChainConfig,
98    pub intersect: IntersectConfig,
99    pub finalize: Option<FinalizeConfig>,
100    pub current_dir: PathBuf,
101    pub breadcrumbs: Breadcrumbs,
102}
103
104#[derive(Debug, Clone)]
105pub enum Record {
106    CborBlock(Vec<u8>),
107    CborTx(Vec<u8>),
108    GenericJson(JsonValue),
109    OuraV1Event(legacy_v1::Event),
110    ParsedTx(ParsedTx),
111    ParsedBlock(ParsedBlock),
112}
113
114impl From<Record> for JsonValue {
115    fn from(value: Record) -> Self {
116        match value {
117            Record::CborBlock(x) => json!({ "hex": hex::encode(x) }),
118            Record::CborTx(x) => json!({ "hex": hex::encode(x) }),
119            Record::ParsedBlock(x) => json!(x),
120            Record::ParsedTx(x) => json!(x),
121            Record::OuraV1Event(x) => json!(x),
122            Record::GenericJson(x) => x,
123        }
124    }
125}
126
127#[derive(Debug, Clone)]
128pub enum ChainEvent {
129    Apply(Point, Record),
130    Undo(Point, Record),
131    Reset(Point),
132}
133
134impl ChainEvent {
135    pub fn apply(point: Point, record: impl Into<Record>) -> gasket::messaging::Message<Self> {
136        gasket::messaging::Message {
137            payload: Self::Apply(point, record.into()),
138        }
139    }
140
141    pub fn undo(point: Point, record: impl Into<Record>) -> gasket::messaging::Message<Self> {
142        gasket::messaging::Message {
143            payload: Self::Undo(point, record.into()),
144        }
145    }
146
147    pub fn reset(point: Point) -> gasket::messaging::Message<Self> {
148        gasket::messaging::Message {
149            payload: Self::Reset(point),
150        }
151    }
152
153    pub fn point(&self) -> &Point {
154        match self {
155            Self::Apply(x, _) => x,
156            Self::Undo(x, _) => x,
157            Self::Reset(x) => x,
158        }
159    }
160
161    pub fn record(&self) -> Option<&Record> {
162        match self {
163            Self::Apply(_, x) => Some(x),
164            Self::Undo(_, x) => Some(x),
165            _ => None,
166        }
167    }
168
169    pub fn map_record(self, f: fn(Record) -> Record) -> Self {
170        match self {
171            Self::Apply(p, x) => Self::Apply(p, f(x)),
172            Self::Undo(p, x) => Self::Undo(p, f(x)),
173            Self::Reset(x) => Self::Reset(x),
174        }
175    }
176
177    pub fn try_map_record<E, F>(self, f: F) -> Result<Self, E>
178    where
179        F: FnOnce(Record) -> Result<Record, E>,
180    {
181        let out = match self {
182            Self::Apply(p, x) => Self::Apply(p, f(x)?),
183            Self::Undo(p, x) => Self::Undo(p, f(x)?),
184            Self::Reset(x) => Self::Reset(x),
185        };
186
187        Ok(out)
188    }
189
190    pub fn try_map_record_to_many<F, E>(self, f: F) -> Result<Vec<Self>, E>
191    where
192        F: FnOnce(Record) -> Result<Vec<Record>, E>,
193    {
194        let out = match self {
195            Self::Apply(p, x) => f(x)?
196                .into_iter()
197                .map(|i| Self::Apply(p.clone(), i))
198                .collect(),
199            Self::Undo(p, x) => f(x)?
200                .into_iter()
201                .map(|i| Self::Undo(p.clone(), i))
202                .collect(),
203            Self::Reset(x) => vec![Self::Reset(x)],
204        };
205
206        Ok(out)
207    }
208}
209
210fn point_to_json(point: Point) -> JsonValue {
211    match &point {
212        pallas::network::miniprotocols::Point::Origin => JsonValue::from("origin"),
213        pallas::network::miniprotocols::Point::Specific(slot, hash) => {
214            json!({ "slot": slot, "hash": hex::encode(hash)})
215        }
216    }
217}
218
219impl From<ChainEvent> for JsonValue {
220    fn from(value: ChainEvent) -> Self {
221        match value {
222            ChainEvent::Apply(point, record) => {
223                json!({
224                    "event": "apply",
225                    "point": point_to_json(point),
226                    "record": JsonValue::from(record.clone())
227                })
228            }
229            ChainEvent::Undo(point, record) => {
230                json!({
231                    "event": "undo",
232                    "point": point_to_json(point),
233                    "record": JsonValue::from(record.clone())
234                })
235            }
236            ChainEvent::Reset(point) => {
237                json!({
238                    "event": "reset",
239                    "point": point_to_json(point)
240                })
241            }
242        }
243    }
244}
245
246pub type SourceOutputPort = gasket::messaging::OutputPort<ChainEvent>;
247pub type FilterInputPort = gasket::messaging::InputPort<ChainEvent>;
248pub type FilterOutputPort = gasket::messaging::OutputPort<ChainEvent>;
249pub type MapperInputPort = gasket::messaging::InputPort<ChainEvent>;
250pub type MapperOutputPort = gasket::messaging::OutputPort<ChainEvent>;
251pub type SinkInputPort = gasket::messaging::InputPort<ChainEvent>;
252pub type SinkCursorPort = gasket::messaging::OutputPort<Point>;
253
254#[derive(Debug, Deserialize, Clone)]
255#[serde(tag = "type", content = "value")]
256pub enum IntersectConfig {
257    Tip,
258    Origin,
259    Point(u64, String),
260    Breadcrumbs(Vec<(u64, String)>),
261}
262
263impl IntersectConfig {
264    pub fn points(&self) -> Option<Vec<Point>> {
265        match self {
266            IntersectConfig::Breadcrumbs(all) => {
267                let mapped = all
268                    .iter()
269                    .map(|(slot, hash)| {
270                        let hash = hex::decode(hash).expect("valid hex hash");
271                        Point::Specific(*slot, hash)
272                    })
273                    .collect();
274
275                Some(mapped)
276            }
277            IntersectConfig::Point(slot, hash) => {
278                let hash = hex::decode(hash).expect("valid hex hash");
279                Some(vec![Point::Specific(*slot, hash)])
280            }
281            _ => None,
282        }
283    }
284}
285
286/// Optional configuration to stop processing new blocks after processing:
287///   1. a block with the given hash
288///   2. the first block on or after a given absolute slot
289///   3. TODO: a total of X blocks
290#[derive(Deserialize, Debug, Clone)]
291pub struct FinalizeConfig {
292    until_hash: Option<String>,
293    max_block_slot: Option<u64>,
294    // max_block_quantity: Option<u64>,
295}
296
297pub fn should_finalize(
298    config: &Option<FinalizeConfig>,
299    last_point: &Point,
300    // block_count: u64,
301) -> bool {
302    let config = match config {
303        Some(x) => x,
304        None => return false,
305    };
306
307    if let Some(expected) = &config.until_hash {
308        if let Point::Specific(_, current) = last_point {
309            return expected == &hex::encode(current);
310        }
311    }
312
313    if let Some(max) = config.max_block_slot {
314        if last_point.slot_or_default() >= max {
315            return true;
316        }
317    }
318
319    // if let Some(max) = config.max_block_quantity {
320    //     if block_count >= max {
321    //         return true;
322    //     }
323    // }
324
325    false
326}