use crate::transaction::Transaction;
use crate::Result;
use std::collections::{HashMap, HashSet};
use std::future::Future;
use std::pin::Pin;
pub type InterpreterFn<T, C> = Box<
dyn Fn(&Transaction, u32, Option<&C>) -> Pin<Box<dyn Future<Output = Option<T>>>> + Send + Sync,
>;
#[derive(Default)]
#[allow(clippy::type_complexity)]
pub struct HistorianConfig<T, C> {
pub debug: bool,
pub history_cache: Option<HashMap<String, Vec<T>>>,
pub interpreter_version: Option<String>,
pub ctx_key_fn: Option<Box<dyn Fn(Option<&C>) -> String + Send + Sync>>,
}
#[allow(clippy::type_complexity)]
pub struct Historian<T, C> {
interpreter: InterpreterFn<T, C>,
debug: bool,
history_cache: Option<tokio::sync::RwLock<HashMap<String, Vec<T>>>>,
interpreter_version: String,
ctx_key_fn: Option<Box<dyn Fn(Option<&C>) -> String + Send + Sync>>,
}
impl<T: Clone + Send + Sync + 'static, C: Send + Sync + 'static> Historian<T, C> {
pub fn new(interpreter: InterpreterFn<T, C>, config: HistorianConfig<T, C>) -> Self {
Self {
interpreter,
debug: config.debug,
history_cache: config.history_cache.map(tokio::sync::RwLock::new),
interpreter_version: config
.interpreter_version
.unwrap_or_else(|| "v1".to_string()),
ctx_key_fn: config.ctx_key_fn,
}
}
pub async fn build_history(
&self,
start_transaction: &Transaction,
context: Option<&C>,
) -> Result<Vec<T>> {
let cache_key = self.cache_key(start_transaction, context);
if let Some(ref cache) = self.history_cache {
let cache_read = cache.read().await;
if let Some(cached) = cache_read.get(&cache_key) {
if self.debug {
eprintln!("[Historian] Cache hit: {}", cache_key);
}
return Ok(cached.clone());
}
}
let mut visited: HashSet<String> = HashSet::new();
let mut results: Vec<T> = Vec::new();
self.traverse(start_transaction, context, &mut visited, &mut results)
.await?;
results.reverse();
if let Some(ref cache) = self.history_cache {
let mut cache_write = cache.write().await;
cache_write.insert(cache_key.clone(), results.clone());
if self.debug {
eprintln!("[Historian] Cached: {}", cache_key);
}
}
Ok(results)
}
fn traverse<'a>(
&'a self,
tx: &'a Transaction,
context: Option<&'a C>,
visited: &'a mut HashSet<String>,
results: &'a mut Vec<T>,
) -> Pin<Box<dyn Future<Output = Result<()>> + 'a>>
where
T: 'a,
C: 'a,
{
Box::pin(async move {
let txid = tx.id();
if visited.contains(&txid) {
if self.debug {
eprintln!("[Historian] Skipping visited: {}", txid);
}
return Ok(());
}
visited.insert(txid.clone());
if self.debug {
eprintln!("[Historian] Processing: {}", txid);
}
for (idx, _output) in tx.outputs.iter().enumerate() {
if let Some(value) = (self.interpreter)(tx, idx as u32, context).await {
results.push(value);
if self.debug {
eprintln!("[Historian] Found value at output {}", idx);
}
}
}
for input in &tx.inputs {
if let Some(ref source_tx) = input.source_transaction {
self.traverse(source_tx, context, visited, results).await?;
} else if self.debug {
eprintln!("[Historian] Input missing source transaction");
}
}
Ok(())
})
}
fn cache_key(&self, tx: &Transaction, context: Option<&C>) -> String {
let txid = tx.id();
let ctx_key = self
.ctx_key_fn
.as_ref()
.map(|f| f(context))
.unwrap_or_default();
format!("{}|{}|{}", self.interpreter_version, txid, ctx_key)
}
}
#[allow(clippy::type_complexity)]
pub struct SyncHistorian<T, C> {
interpreter: Box<dyn Fn(&Transaction, u32, Option<&C>) -> Option<T> + Send + Sync>,
debug: bool,
#[allow(dead_code)]
interpreter_version: String,
}
impl<T: Clone + Send + Sync, C: Send + Sync> SyncHistorian<T, C> {
pub fn new<F>(interpreter: F) -> Self
where
F: Fn(&Transaction, u32, Option<&C>) -> Option<T> + Send + Sync + 'static,
{
Self {
interpreter: Box::new(interpreter),
debug: false,
interpreter_version: "v1".to_string(),
}
}
pub fn with_debug(mut self, debug: bool) -> Self {
self.debug = debug;
self
}
pub fn with_version(mut self, version: impl Into<String>) -> Self {
self.interpreter_version = version.into();
self
}
pub fn build_history(&self, start_transaction: &Transaction, context: Option<&C>) -> Vec<T> {
let mut visited: HashSet<String> = HashSet::new();
let mut results: Vec<T> = Vec::new();
self.traverse(start_transaction, context, &mut visited, &mut results);
results.reverse();
results
}
fn traverse(
&self,
tx: &Transaction,
context: Option<&C>,
visited: &mut HashSet<String>,
results: &mut Vec<T>,
) {
let txid = tx.id();
if visited.contains(&txid) {
return;
}
visited.insert(txid.clone());
if self.debug {
eprintln!("[SyncHistorian] Processing: {}", txid);
}
for (idx, _output) in tx.outputs.iter().enumerate() {
if let Some(value) = (self.interpreter)(tx, idx as u32, context) {
results.push(value);
}
}
for input in &tx.inputs {
if let Some(ref source_tx) = input.source_transaction {
self.traverse(source_tx, context, visited, results);
}
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::script::LockingScript;
use crate::transaction::{TransactionInput, TransactionOutput};
fn create_test_tx(_id_suffix: u8, source: Option<Transaction>) -> Transaction {
let mut tx = Transaction::new();
tx.outputs.push(TransactionOutput::new(
1000,
LockingScript::from_asm("OP_TRUE").unwrap(),
));
if let Some(source_tx) = source {
let mut input = TransactionInput::new(source_tx.id(), 0);
input.source_transaction = Some(Box::new(source_tx));
tx.inputs.push(input);
}
tx
}
#[test]
fn test_sync_historian_single_tx() {
let tx = create_test_tx(1, None);
let historian = SyncHistorian::<u32, ()>::new(|_tx, output_idx, _ctx| Some(output_idx));
let history = historian.build_history(&tx, None);
assert_eq!(history, vec![0]);
}
#[test]
fn test_sync_historian_chain() {
let tx1 = create_test_tx(1, None);
let tx2 = create_test_tx(2, Some(tx1));
let tx3 = create_test_tx(3, Some(tx2));
let historian = SyncHistorian::<String, ()>::new(|tx, _output_idx, _ctx| Some(tx.id()));
let history = historian.build_history(&tx3, None);
assert_eq!(history.len(), 3);
}
#[test]
fn test_sync_historian_filters() {
let tx = create_test_tx(1, None);
let historian = SyncHistorian::<u32, ()>::new(|_tx, output_idx, _ctx| {
if output_idx % 2 == 0 {
Some(output_idx)
} else {
None
}
});
let history = historian.build_history(&tx, None);
assert_eq!(history, vec![0]);
}
#[test]
fn test_sync_historian_with_context() {
let tx = create_test_tx(1, None);
let historian =
SyncHistorian::<u32, u32>::new(|_tx, output_idx, ctx| ctx.map(|c| output_idx + c));
let history_with_ctx = historian.build_history(&tx, Some(&10));
assert_eq!(history_with_ctx, vec![10]);
let history_without_ctx = historian.build_history(&tx, None);
assert!(history_without_ctx.is_empty());
}
#[test]
fn test_sync_historian_prevents_cycles() {
let tx = create_test_tx(1, None);
let historian = SyncHistorian::<u32, ()>::new(|_tx, output_idx, _ctx| Some(output_idx));
let history = historian.build_history(&tx, None);
assert_eq!(history.len(), 1);
}
#[tokio::test]
async fn test_async_historian_basic() {
let tx = create_test_tx(1, None);
let interpreter: InterpreterFn<u32, ()> =
Box::new(|_tx, output_idx, _ctx| Box::pin(async move { Some(output_idx) }));
let historian = Historian::new(interpreter, HistorianConfig::default());
let history = historian.build_history(&tx, None).await.unwrap();
assert_eq!(history, vec![0]);
}
#[tokio::test]
async fn test_async_historian_with_cache() {
let tx = create_test_tx(1, None);
let interpreter: InterpreterFn<u32, ()> =
Box::new(|_tx, output_idx, _ctx| Box::pin(async move { Some(output_idx) }));
let config = HistorianConfig {
debug: false,
history_cache: Some(HashMap::new()),
interpreter_version: Some("v1".to_string()),
ctx_key_fn: None,
};
let historian = Historian::new(interpreter, config);
let history1 = historian.build_history(&tx, None).await.unwrap();
let history2 = historian.build_history(&tx, None).await.unwrap();
assert_eq!(history1, history2);
}
}