ddk_node/
lib.rs

1pub mod cli_opts;
2pub mod command;
3pub mod ddkrpc;
4pub mod opts;
5mod seed;
6
7use bitcoin::secp256k1::PublicKey;
8use bitcoin::{Address, Amount, FeeRate, Network};
9use ddk::builder::Builder;
10use ddk::oracle::kormir::KormirOracleClient;
11use ddk::storage::sled::SledStorage;
12use ddk::transport::lightning::LightningTransport;
13use ddk::util::serialize_contract;
14use ddk::DlcDevKit;
15use ddk::{Oracle, Storage, Transport};
16use ddk_manager::contract::contract_input::ContractInput;
17use ddk_manager::Oracle as DlcOracle;
18use ddk_manager::Storage as DlcStorage;
19use ddkrpc::ddk_rpc_server::{DdkRpc, DdkRpcServer};
20use ddkrpc::{
21    AcceptOfferRequest, AcceptOfferResponse, ConnectRequest, ConnectResponse,
22    GetWalletTransactionsRequest, GetWalletTransactionsResponse, ListContractsRequest,
23    ListContractsResponse, ListOffersRequest, ListOffersResponse, ListOraclesRequest,
24    ListOraclesResponse, ListPeersRequest, ListPeersResponse, ListUtxosRequest, ListUtxosResponse,
25    NewAddressRequest, NewAddressResponse, OracleAnnouncementsRequest, OracleAnnouncementsResponse,
26    Peer, SendOfferRequest, SendOfferResponse, SendRequest, SendResponse, WalletBalanceRequest,
27    WalletBalanceResponse, WalletSyncRequest, WalletSyncResponse,
28};
29use ddkrpc::{InfoRequest, InfoResponse};
30use opts::NodeOpts;
31use std::str::FromStr;
32use std::sync::Arc;
33use tonic::transport::Server;
34use tonic::Request;
35use tonic::Response;
36use tonic::Status;
37use tonic::{async_trait, Code};
38
39type Ddk = DlcDevKit<LightningTransport, SledStorage, KormirOracleClient>;
40
41#[derive(Clone)]
42pub struct DdkNode {
43    pub node: Arc<Ddk>,
44}
45
46impl DdkNode {
47    pub fn new(ddk: Ddk) -> Self {
48        Self {
49            node: Arc::new(ddk),
50        }
51    }
52
53    pub async fn serve(opts: NodeOpts) -> anyhow::Result<()> {
54        let storage_path = match opts.storage_dir {
55            Some(storage) => storage,
56            None => homedir::my_home()
57                .expect("Provide a directory for ddk.")
58                .unwrap()
59                .join(".ddk")
60                .join("default-ddk"),
61        };
62        let network = Network::from_str(&opts.network)?;
63        std::fs::create_dir_all(storage_path.clone())?;
64
65        let seed_bytes = crate::seed::xprv_from_path(storage_path.clone(), network)?;
66
67        tracing::info!("Starting DDK node.");
68
69        let transport = Arc::new(LightningTransport::new(
70            &seed_bytes.private_key.secret_bytes(),
71            opts.listening_port,
72        )?);
73
74        let storage = Arc::new(SledStorage::new(
75            storage_path.join("sled_db").to_str().unwrap(),
76        )?);
77
78        // let oracle = Arc::new(P2PDOracleClient::new(&oracle_host).await?);
79        let oracle = Arc::new(KormirOracleClient::new(&opts.oracle_host, None).await?);
80
81        let mut builder = Builder::new();
82        builder.set_seed_bytes(seed_bytes.private_key.secret_bytes());
83        builder.set_esplora_host(opts.esplora_host);
84        builder.set_network(network);
85        builder.set_transport(transport.clone());
86        builder.set_storage(storage.clone());
87        builder.set_oracle(oracle.clone());
88
89        let ddk: Ddk = builder.finish().await?;
90
91        ddk.start()?;
92        let node = DdkNode::new(ddk);
93        let node_stop = node.node.clone();
94        let server = Server::builder()
95            .add_service(DdkRpcServer::new(node))
96            .serve_with_shutdown(opts.grpc_host.parse()?, async {
97                tokio::signal::ctrl_c()
98                    .await
99                    .expect("Failed to install Ctrl+C signal handler");
100                let _ = node_stop.stop();
101            });
102
103        server.await?;
104
105        Ok(())
106    }
107}
108
109#[async_trait]
110impl DdkRpc for DdkNode {
111    #[tracing::instrument(skip(self, _request), name = "grpc_server")]
112    async fn info(&self, _request: Request<InfoRequest>) -> Result<Response<InfoResponse>, Status> {
113        tracing::info!("Request for node info.");
114        let pubkey = self.node.transport.node_id.to_string();
115        let transport = self.node.transport.name();
116        let oracle = self.node.oracle.name();
117        let response = InfoResponse {
118            pubkey,
119            transport,
120            oracle,
121        };
122        Ok(Response::new(response))
123    }
124
125    #[tracing::instrument(skip(self, request), name = "grpc_server")]
126    async fn send_offer(
127        &self,
128        request: Request<SendOfferRequest>,
129    ) -> Result<Response<SendOfferResponse>, Status> {
130        tracing::info!("Request to send offer.");
131        let SendOfferRequest {
132            contract_input,
133            counter_party,
134        } = request.into_inner();
135        let contract_input: ContractInput =
136            serde_json::from_slice(&contract_input).expect("couldn't get bytes correct");
137        let mut oracle_announcements = Vec::new();
138        for info in &contract_input.contract_infos {
139            let announcement = self
140                .node
141                .oracle
142                .get_announcement(&info.oracles.event_id)
143                .await
144                .unwrap();
145            oracle_announcements.push(announcement)
146        }
147
148        let counter_party = PublicKey::from_str(&counter_party).expect("no public key");
149        let offer_msg = self
150            .node
151            .send_dlc_offer(&contract_input, counter_party, oracle_announcements)
152            .await
153            .map_err(|e| {
154                Status::new(
155                    Code::Cancelled,
156                    format!(
157                        "Contract offer could not be sent to counterparty. error={:?}",
158                        e
159                    ),
160                )
161            })?;
162
163        let offer_dlc =
164            serde_json::to_vec(&offer_msg).expect("OfferDlc could not be converted to vec.");
165        Ok(Response::new(SendOfferResponse { offer_dlc }))
166    }
167
168    #[tracing::instrument(skip(self, request), name = "grpc_server")]
169    async fn accept_offer(
170        &self,
171        request: Request<AcceptOfferRequest>,
172    ) -> Result<Response<AcceptOfferResponse>, Status> {
173        tracing::info!("Request to accept offer.");
174        let mut contract_id = [0u8; 32];
175        let contract_id_bytes = hex::decode(&request.into_inner().contract_id).unwrap();
176        contract_id.copy_from_slice(&contract_id_bytes);
177        let (contract_id, counter_party, accept_dlc) = self
178            .node
179            .accept_dlc_offer(contract_id)
180            .await
181            .map_err(|_| Status::new(Code::Cancelled, "Contract could not be accepted."))?;
182
183        let accept_dlc = serde_json::to_vec(&accept_dlc).map_err(|_| {
184            Status::new(Code::Cancelled, "Accept DLC is malformed to create bytes.")
185        })?;
186
187        Ok(Response::new(AcceptOfferResponse {
188            contract_id,
189            counter_party,
190            accept_dlc,
191        }))
192    }
193
194    #[tracing::instrument(skip(self, _request), name = "grpc_server")]
195    async fn new_address(
196        &self,
197        _request: Request<NewAddressRequest>,
198    ) -> Result<Response<NewAddressResponse>, Status> {
199        tracing::info!("Request for new wallet address");
200        let address = self.node.wallet.new_external_address().unwrap().to_string();
201        let response = NewAddressResponse { address };
202        Ok(Response::new(response))
203    }
204
205    #[tracing::instrument(skip(self, _request), name = "grpc_server")]
206    async fn list_offers(
207        &self,
208        _request: Request<ListOffersRequest>,
209    ) -> Result<Response<ListOffersResponse>, Status> {
210        tracing::info!("Request for offers to the node.");
211        let offers = self.node.storage.get_contract_offers().unwrap();
212        let offers: Vec<Vec<u8>> = offers
213            .iter()
214            .map(|offer| serde_json::to_vec(offer).unwrap())
215            .collect();
216
217        Ok(Response::new(ListOffersResponse { offers }))
218    }
219
220    #[tracing::instrument(skip(self, _request), name = "grpc_server")]
221    async fn wallet_balance(
222        &self,
223        _request: Request<WalletBalanceRequest>,
224    ) -> Result<Response<WalletBalanceResponse>, Status> {
225        tracing::info!("Request for wallet balance.");
226        let wallet_balance = self.node.balance().unwrap();
227
228        let response = WalletBalanceResponse {
229            confirmed: wallet_balance.confirmed.to_sat(),
230            foreign_unconfirmed: wallet_balance.foreign_unconfirmed.to_sat(),
231            change_unconfirmed: wallet_balance.change_unconfirmed.to_sat(),
232            contract_balance: wallet_balance.contract_pnl,
233        };
234        Ok(Response::new(response))
235    }
236
237    #[tracing::instrument(skip(self, _request), name = "grpc_server")]
238    async fn get_wallet_transactions(
239        &self,
240        _request: Request<GetWalletTransactionsRequest>,
241    ) -> Result<Response<GetWalletTransactionsResponse>, Status> {
242        tracing::info!("Request for all wallet transactions.");
243        let wallet_transactions = self.node.wallet.get_transactions().unwrap();
244        let transactions: Vec<Vec<u8>> = wallet_transactions
245            .iter()
246            .map(|t| serde_json::to_vec(&t).unwrap())
247            .collect();
248        Ok(Response::new(GetWalletTransactionsResponse {
249            transactions,
250        }))
251    }
252
253    #[tracing::instrument(skip(self, _request), name = "grpc_server")]
254    async fn list_utxos(
255        &self,
256        _request: Request<ListUtxosRequest>,
257    ) -> Result<Response<ListUtxosResponse>, Status> {
258        tracing::info!("Request to list all wallet utxos");
259        let utxos = self.node.wallet.list_utxos().unwrap();
260        let utxos: Vec<Vec<u8>> = utxos
261            .iter()
262            .map(|utxo| serde_json::to_vec(utxo).unwrap())
263            .collect();
264        Ok(Response::new(ListUtxosResponse { utxos }))
265    }
266
267    #[tracing::instrument(skip(self, _request), name = "grpc_server")]
268    async fn list_peers(
269        &self,
270        _request: Request<ListPeersRequest>,
271    ) -> Result<Response<ListPeersResponse>, Status> {
272        tracing::info!("List peers request");
273        let peers = self.node.transport.peer_manager.list_peers();
274        let peers = peers
275            .iter()
276            .map(|peer| {
277                let host = match &peer.socket_address {
278                    Some(h) => h.to_string(),
279                    None => "".to_string(),
280                };
281                let pubkey = peer.counterparty_node_id.to_string();
282                Peer { pubkey, host }
283            })
284            .collect::<Vec<Peer>>();
285
286        Ok(Response::new(ListPeersResponse { peers }))
287    }
288
289    #[tracing::instrument(skip(self, request), name = "grpc_server")]
290    async fn connect_peer(
291        &self,
292        request: Request<ConnectRequest>,
293    ) -> Result<Response<ConnectResponse>, Status> {
294        let ConnectRequest { pubkey, host } = request.into_inner();
295        let pubkey = PublicKey::from_str(&pubkey).unwrap();
296        self.node.transport.connect_outbound(pubkey, &host).await;
297        Ok(Response::new(ConnectResponse {}))
298    }
299
300    async fn list_oracles(
301        &self,
302        _request: Request<ListOraclesRequest>,
303    ) -> Result<Response<ListOraclesResponse>, Status> {
304        let pubkey = self.node.oracle.get_pubkey().await.unwrap().to_string();
305        let name = self.node.oracle.name();
306        Ok(Response::new(ListOraclesResponse { name, pubkey }))
307    }
308
309    async fn list_contracts(
310        &self,
311        _request: Request<ListContractsRequest>,
312    ) -> Result<Response<ListContractsResponse>, Status> {
313        let contracts = self
314            .node
315            .storage
316            .get_contracts()
317            .map_err(|e| Status::new(Code::Cancelled, e.to_string()))?;
318        let contract_bytes: Vec<Vec<u8>> = contracts
319            .iter()
320            .map(|contract| serialize_contract(contract).unwrap())
321            .collect();
322        Ok(Response::new(ListContractsResponse {
323            contracts: contract_bytes,
324        }))
325    }
326
327    async fn send(&self, request: Request<SendRequest>) -> Result<Response<SendResponse>, Status> {
328        let SendRequest {
329            address,
330            amount,
331            fee_rate,
332        } = request.into_inner();
333        let address = Address::from_str(&address).unwrap().assume_checked();
334        let amount = Amount::from_sat(amount);
335        let fee_rate = match FeeRate::from_sat_per_vb(fee_rate) {
336            Some(f) => f,
337            None => return Err(Status::new(Code::InvalidArgument, "Invalid fee rate.")),
338        };
339        let txn = self
340            .node
341            .wallet
342            .send_to_address(address, amount, fee_rate)
343            .await;
344        if let Ok(tx) = txn {
345            Ok(Response::new(SendResponse {
346                txid: tx.to_string(),
347            }))
348        } else {
349            Err(Status::new(Code::Internal, "Transaction sending failed."))
350        }
351    }
352
353    async fn oracle_announcements(
354        &self,
355        _request: Request<OracleAnnouncementsRequest>,
356    ) -> Result<Response<OracleAnnouncementsResponse>, Status> {
357        let announcements: Vec<Vec<u8>> = self
358            .node
359            .storage
360            .get_marketplace_announcements()
361            // TODO: fails if no announcements
362            .unwrap()
363            .iter()
364            .map(|ann| serde_json::to_vec(ann).unwrap())
365            .collect();
366        Ok(Response::new(OracleAnnouncementsResponse { announcements }))
367    }
368
369    async fn wallet_sync(
370        &self,
371        _request: Request<WalletSyncRequest>,
372    ) -> Result<Response<WalletSyncResponse>, Status> {
373        self.node
374            .wallet
375            .sync()
376            .await
377            .map_err(|_| Status::new(Code::Aborted, "Did not sync wallet."))?;
378        Ok(Response::new(WalletSyncResponse {}))
379    }
380}