1use async_trait::async_trait;
2use bitcoind_client::dummy::DummyPersistentTxooSource;
3use lightning_signer::txoo::source::{FileSource, HttpSource, Source};
4use std::collections::HashSet;
5use std::path::PathBuf;
6use std::sync::{Arc, Mutex};
7use std::time::Duration;
8use tokio::{task, time};
9use url::Url;
10
11use lightning_signer::bitcoin::hash_types::FilterHeader;
12use lightning_signer::bitcoin::{BlockHash, Network};
13use log::info;
14
15use triggered::Listener;
16
17use crate::{chain_follower::ChainFollower, ChainTrack, ChainTrackDirectory};
18
19#[async_trait]
20pub trait SourceFactory: Send + Sync {
21 async fn get_source(
23 &self,
24 start_block: u32,
25 block_hash: BlockHash,
26 filter_header: FilterHeader,
27 ) -> Box<dyn Source>;
28
29 fn get_network(&self) -> Network;
30}
31
32pub struct FileSourceFactory {
33 datadir: PathBuf,
34 network: Network,
35}
36
37impl FileSourceFactory {
38 pub fn new<P: Into<PathBuf>>(datadir: P, network: Network) -> Self {
40 Self { datadir: datadir.into(), network }
41 }
42}
43
44#[async_trait]
45impl SourceFactory for FileSourceFactory {
46 async fn get_source(
47 &self,
48 _start_block: u32,
49 _block_hash: BlockHash,
50 _filter_header: FilterHeader,
51 ) -> Box<dyn Source> {
52 let file_source = FileSource::new(self.datadir.clone());
53 let source_network = file_source.oracle_setup().await.network;
54 assert_eq!(source_network, self.network);
55 Box::new(file_source)
56 }
57
58 fn get_network(&self) -> Network {
59 self.network
60 }
61}
62
63pub struct HTTPSourceFactory {
64 url: Url,
65 network: Network,
66}
67
68impl HTTPSourceFactory {
69 pub fn new(url: Url, network: Network) -> Self {
71 Self { url: url, network }
72 }
73}
74
75#[async_trait]
76impl SourceFactory for HTTPSourceFactory {
77 async fn get_source(
78 &self,
79 _start_block: u32,
80 _block_hash: BlockHash,
81 _filter_header: FilterHeader,
82 ) -> Box<dyn Source> {
83 let http_source =
84 HttpSource::new(self.url.clone()).await.expect("couldn't connect to http source");
85 let source_network = http_source.oracle_setup().await.network;
86 assert_eq!(source_network, self.network);
87 Box::new(http_source)
88 }
89
90 fn get_network(&self) -> Network {
91 self.network
92 }
93}
94
95pub struct DummySourceFactory {
96 datadir: PathBuf,
97 network: Network,
98}
99
100impl DummySourceFactory {
101 pub fn new<P: Into<PathBuf>>(datadir: P, network: Network) -> Self {
103 Self { datadir: datadir.into(), network }
104 }
105}
106
107#[async_trait]
108impl SourceFactory for DummySourceFactory {
109 async fn get_source(
110 &self,
111 start_block: u32,
112 block_hash: BlockHash,
113 filter_header: FilterHeader,
114 ) -> Box<dyn Source> {
115 Box::new(DummyPersistentTxooSource::from_checkpoint(
116 self.datadir.clone(),
117 self.network,
118 start_block,
119 block_hash,
120 filter_header.clone(),
121 ))
122 }
123
124 fn get_network(&self) -> Network {
125 self.network
126 }
127}
128
129#[derive(Clone)]
130pub struct Frontend {
131 directory: Arc<dyn ChainTrackDirectory>,
132 rpc_url: Url,
133 tracker_ids: Arc<Mutex<HashSet<Vec<u8>>>>,
134 source_factory: Arc<dyn SourceFactory>,
135 shutdown_signal: Listener,
136}
137
138impl Frontend {
139 pub fn new(
141 signer: Arc<dyn ChainTrackDirectory>,
142 source_factory: Arc<dyn SourceFactory>,
143 rpc_url: Url,
144 shutdown_signal: Listener,
145 ) -> Frontend {
146 let tracker_ids = Arc::new(Mutex::new(HashSet::new()));
147 Frontend { directory: signer, source_factory, rpc_url, tracker_ids, shutdown_signal }
148 }
149
150 pub fn directory(&self) -> Arc<dyn ChainTrackDirectory> {
151 Arc::clone(&self.directory)
152 }
153
154 pub fn start(&self) {
156 let s = self.clone();
157 let shutdown_signal = self.shutdown_signal.clone();
158 task::spawn(async move {
159 s.start_loop(shutdown_signal).await;
160 });
161 info!("frontend started");
162 }
163
164 async fn start_loop(&self, shutdown_signal: Listener) {
165 let mut interval = time::interval(Duration::from_secs(1));
166 loop {
167 let shutdown_signal_clone = shutdown_signal.clone();
168 tokio::select! {
169 _ = interval.tick() => self.handle_new_trackers().await,
170 _ = shutdown_signal_clone => break,
171 }
172 }
173 info!("frontend stopped");
174 }
175
176 async fn handle_new_trackers(&self) {
177 for tracker in self.directory.trackers().await {
178 let tracker_id = tracker.id().await;
179 {
180 let mut lock = self.tracker_ids.lock().unwrap();
181 if lock.contains(&tracker_id) {
182 continue;
183 }
184 lock.insert(tracker_id);
185 }
186 self.start_follower(tracker).await;
187 }
188 }
189
190 pub async fn start_follower(&self, tracker: Arc<dyn ChainTrack>) {
192 assert_eq!(tracker.network(), self.source_factory.get_network());
193 let cf_arc = ChainFollower::new(tracker, self.source_factory.as_ref(), &self.rpc_url).await;
194 ChainFollower::start(cf_arc).await;
195 }
196}