vls_frontend/
frontend.rs

1use async_trait::async_trait;
2use bitcoind_client::dummy::DummyPersistentTxooSource;
3use lightning_signer::txoo::source::{FileSource, HttpSource, Source};
4use std::collections::HashSet;
5use std::path::PathBuf;
6use std::sync::{Arc, Mutex};
7use std::time::Duration;
8use tokio::{task, time};
9use url::Url;
10
11use lightning_signer::bitcoin::hash_types::FilterHeader;
12use lightning_signer::bitcoin::{BlockHash, Network};
13use log::info;
14
15use triggered::Listener;
16
17use crate::{chain_follower::ChainFollower, ChainTrack, ChainTrackDirectory};
18
19#[async_trait]
20pub trait SourceFactory: Send + Sync {
21    /// Get a new TXOO Source
22    async fn get_source(
23        &self,
24        start_block: u32,
25        block_hash: BlockHash,
26        filter_header: FilterHeader,
27    ) -> Box<dyn Source>;
28
29    fn get_network(&self) -> Network;
30}
31
32pub struct FileSourceFactory {
33    datadir: PathBuf,
34    network: Network,
35}
36
37impl FileSourceFactory {
38    /// Create a new FileSourceFactory
39    pub fn new<P: Into<PathBuf>>(datadir: P, network: Network) -> Self {
40        Self { datadir: datadir.into(), network }
41    }
42}
43
44#[async_trait]
45impl SourceFactory for FileSourceFactory {
46    async fn get_source(
47        &self,
48        _start_block: u32,
49        _block_hash: BlockHash,
50        _filter_header: FilterHeader,
51    ) -> Box<dyn Source> {
52        let file_source = FileSource::new(self.datadir.clone());
53        let source_network = file_source.oracle_setup().await.network;
54        assert_eq!(source_network, self.network);
55        Box::new(file_source)
56    }
57
58    fn get_network(&self) -> Network {
59        self.network
60    }
61}
62
63pub struct HTTPSourceFactory {
64    url: Url,
65    network: Network,
66}
67
68impl HTTPSourceFactory {
69    /// Create a new HTTPSourceFactory
70    pub fn new(url: Url, network: Network) -> Self {
71        Self { url: url, network }
72    }
73}
74
75#[async_trait]
76impl SourceFactory for HTTPSourceFactory {
77    async fn get_source(
78        &self,
79        _start_block: u32,
80        _block_hash: BlockHash,
81        _filter_header: FilterHeader,
82    ) -> Box<dyn Source> {
83        let http_source =
84            HttpSource::new(self.url.clone()).await.expect("couldn't connect to http source");
85        let source_network = http_source.oracle_setup().await.network;
86        assert_eq!(source_network, self.network);
87        Box::new(http_source)
88    }
89
90    fn get_network(&self) -> Network {
91        self.network
92    }
93}
94
95pub struct DummySourceFactory {
96    datadir: PathBuf,
97    network: Network,
98}
99
100impl DummySourceFactory {
101    /// Create a new DummySourceFactory
102    pub fn new<P: Into<PathBuf>>(datadir: P, network: Network) -> Self {
103        Self { datadir: datadir.into(), network }
104    }
105}
106
107#[async_trait]
108impl SourceFactory for DummySourceFactory {
109    async fn get_source(
110        &self,
111        start_block: u32,
112        block_hash: BlockHash,
113        filter_header: FilterHeader,
114    ) -> Box<dyn Source> {
115        Box::new(DummyPersistentTxooSource::from_checkpoint(
116            self.datadir.clone(),
117            self.network,
118            start_block,
119            block_hash,
120            filter_header.clone(),
121        ))
122    }
123
124    fn get_network(&self) -> Network {
125        self.network
126    }
127}
128
129#[derive(Clone)]
130pub struct Frontend {
131    directory: Arc<dyn ChainTrackDirectory>,
132    rpc_url: Url,
133    tracker_ids: Arc<Mutex<HashSet<Vec<u8>>>>,
134    source_factory: Arc<dyn SourceFactory>,
135    shutdown_signal: Listener,
136}
137
138impl Frontend {
139    /// Create a new Frontend
140    pub fn new(
141        signer: Arc<dyn ChainTrackDirectory>,
142        source_factory: Arc<dyn SourceFactory>,
143        rpc_url: Url,
144        shutdown_signal: Listener,
145    ) -> Frontend {
146        let tracker_ids = Arc::new(Mutex::new(HashSet::new()));
147        Frontend { directory: signer, source_factory, rpc_url, tracker_ids, shutdown_signal }
148    }
149
150    pub fn directory(&self) -> Arc<dyn ChainTrackDirectory> {
151        Arc::clone(&self.directory)
152    }
153
154    /// Start a task which creates a chain follower for each existing tracker
155    pub fn start(&self) {
156        let s = self.clone();
157        let shutdown_signal = self.shutdown_signal.clone();
158        task::spawn(async move {
159            s.start_loop(shutdown_signal).await;
160        });
161        info!("frontend started");
162    }
163
164    async fn start_loop(&self, shutdown_signal: Listener) {
165        let mut interval = time::interval(Duration::from_secs(1));
166        loop {
167            let shutdown_signal_clone = shutdown_signal.clone();
168            tokio::select! {
169                _ = interval.tick() => self.handle_new_trackers().await,
170                _ = shutdown_signal_clone => break,
171            }
172        }
173        info!("frontend stopped");
174    }
175
176    async fn handle_new_trackers(&self) {
177        for tracker in self.directory.trackers().await {
178            let tracker_id = tracker.id().await;
179            {
180                let mut lock = self.tracker_ids.lock().unwrap();
181                if lock.contains(&tracker_id) {
182                    continue;
183                }
184                lock.insert(tracker_id);
185            }
186            self.start_follower(tracker).await;
187        }
188    }
189
190    /// Start a chain follower for a specific tracker
191    pub async fn start_follower(&self, tracker: Arc<dyn ChainTrack>) {
192        assert_eq!(tracker.network(), self.source_factory.get_network());
193        let cf_arc = ChainFollower::new(tracker, self.source_factory.as_ref(), &self.rpc_url).await;
194        ChainFollower::start(cf_arc).await;
195    }
196}