ddk_node/
lib.rs

1pub mod cli_opts;
2pub mod command;
3pub mod ddkrpc;
4pub mod opts;
5mod seed;
6
7use bitcoin::secp256k1::PublicKey;
8use bitcoin::{Address, Amount, FeeRate, Network};
9use ddk::builder::{Builder, SeedConfig};
10use ddk::oracle::kormir::KormirOracleClient;
11use ddk::storage::postgres::PostgresStore;
12use ddk::transport::nostr::NostrDlc;
13use ddk::util::ser::serialize_contract;
14use ddk::DlcDevKit;
15use ddk::{Oracle, Transport};
16use ddk_manager::contract::contract_input::ContractInput;
17use ddk_manager::Oracle as DlcOracle;
18use ddk_manager::Storage as DlcStorage;
19use ddkrpc::ddk_rpc_server::{DdkRpc, DdkRpcServer};
20use ddkrpc::{
21    AcceptOfferRequest, AcceptOfferResponse, ConnectRequest, ConnectResponse, CreateEnumRequest,
22    CreateEnumResponse, GetWalletTransactionsRequest, GetWalletTransactionsResponse,
23    ListContractsRequest, ListContractsResponse, ListOffersRequest, ListOffersResponse,
24    ListOraclesRequest, ListOraclesResponse, ListPeersRequest, ListPeersResponse, ListUtxosRequest,
25    ListUtxosResponse, NewAddressRequest, NewAddressResponse, OracleAnnouncementsRequest,
26    OracleAnnouncementsResponse, SendOfferRequest, SendOfferResponse, SendRequest, SendResponse,
27    SyncRequest, SyncResponse, WalletBalanceRequest, WalletBalanceResponse, WalletSyncRequest,
28    WalletSyncResponse,
29};
30use ddkrpc::{InfoRequest, InfoResponse};
31use opts::NodeOpts;
32use std::str::FromStr;
33use std::sync::Arc;
34use tonic::transport::Server;
35use tonic::Request;
36use tonic::Response;
37use tonic::Status;
38use tonic::{async_trait, Code};
39
40type Ddk = DlcDevKit<NostrDlc, PostgresStore, KormirOracleClient>;
41
42#[derive(Clone)]
43pub struct DdkNode {
44    pub node: Arc<Ddk>,
45}
46
47impl DdkNode {
48    pub fn new(ddk: Ddk) -> Self {
49        Self {
50            node: Arc::new(ddk),
51        }
52    }
53
54    pub async fn serve(opts: NodeOpts) -> anyhow::Result<()> {
55        let storage_path = match opts.storage_dir {
56            Some(storage) => storage,
57            None => homedir::my_home()
58                .expect("Provide a directory for ddk.")
59                .unwrap()
60                .join(".ddk")
61                .join("default-ddk"),
62        };
63        let network = Network::from_str(&opts.network)?;
64        std::fs::create_dir_all(storage_path.clone())?;
65
66        let seed_bytes = crate::seed::xprv_from_path(storage_path.clone())?;
67
68        tracing::info!("Starting DDK node.");
69        let transport =
70            Arc::new(NostrDlc::new(&seed_bytes, "wss://nostr.dlcdevkit.com", network).await?);
71
72        let storage = Arc::new(PostgresStore::new(&opts.postgres_url, true, opts.name).await?);
73
74        // let oracle = Arc::new(P2PDOracleClient::new(&oracle_host).await?);
75        let oracle = Arc::new(KormirOracleClient::new(&opts.oracle_host, None).await?);
76
77        let mut builder = Builder::new();
78        builder.set_seed_bytes(SeedConfig::Bytes(seed_bytes))?;
79        builder.set_esplora_host(opts.esplora_host);
80        builder.set_network(network);
81        builder.set_transport(transport.clone());
82        builder.set_storage(storage.clone());
83        builder.set_oracle(oracle.clone());
84
85        let ddk: Ddk = builder.finish().await?;
86
87        ddk.start()?;
88        let node = DdkNode::new(ddk);
89        let node_stop = node.node.clone();
90        let server = Server::builder()
91            .add_service(DdkRpcServer::new(node))
92            .serve_with_shutdown(opts.grpc_host.parse()?, async {
93                tokio::signal::ctrl_c()
94                    .await
95                    .expect("Failed to install Ctrl+C signal handler");
96                let _ = node_stop.stop();
97            });
98
99        server.await?;
100
101        Ok(())
102    }
103}
104
105#[async_trait]
106impl DdkRpc for DdkNode {
107    #[tracing::instrument(skip(self, _request), name = "grpc_server")]
108    async fn info(&self, _request: Request<InfoRequest>) -> Result<Response<InfoResponse>, Status> {
109        tracing::info!("Request for node info.");
110        let pubkey = self.node.transport.public_key().to_string();
111        let transport = self.node.transport.name();
112        let oracle = self.node.oracle.name();
113        let response = InfoResponse {
114            pubkey,
115            transport,
116            oracle,
117        };
118        Ok(Response::new(response))
119    }
120
121    #[tracing::instrument(skip(self, request), name = "grpc_server")]
122    async fn send_offer(
123        &self,
124        request: Request<SendOfferRequest>,
125    ) -> Result<Response<SendOfferResponse>, Status> {
126        tracing::info!("Request to send offer.");
127        let SendOfferRequest {
128            contract_input,
129            counter_party,
130        } = request.into_inner();
131        let contract_input: ContractInput =
132            serde_json::from_slice(&contract_input).expect("couldn't get bytes correct");
133        let mut oracle_announcements = Vec::new();
134        for info in &contract_input.contract_infos {
135            let announcement = self
136                .node
137                .oracle
138                .get_announcement(&info.oracles.event_id)
139                .await
140                .unwrap();
141            oracle_announcements.push(announcement)
142        }
143
144        let counter_party = PublicKey::from_str(&counter_party).expect("no public key");
145        let offer_msg = self
146            .node
147            .send_dlc_offer(&contract_input, counter_party, oracle_announcements)
148            .await
149            .map_err(|e| {
150                Status::new(
151                    Code::Cancelled,
152                    format!("Contract offer could not be sent to counterparty. error={e}"),
153                )
154            })?;
155
156        let offer_dlc =
157            serde_json::to_vec(&offer_msg).expect("OfferDlc could not be converted to vec.");
158        Ok(Response::new(SendOfferResponse { offer_dlc }))
159    }
160
161    #[tracing::instrument(skip(self, request), name = "grpc_server")]
162    async fn accept_offer(
163        &self,
164        request: Request<AcceptOfferRequest>,
165    ) -> Result<Response<AcceptOfferResponse>, Status> {
166        tracing::info!("Request to accept offer.");
167        let mut contract_id = [0u8; 32];
168        let contract_id_bytes = hex::decode(&request.into_inner().contract_id).unwrap();
169        contract_id.copy_from_slice(&contract_id_bytes);
170        let (contract_id, counter_party, accept_dlc) =
171            self.node.accept_dlc_offer(contract_id).await.map_err(|e| {
172                Status::new(
173                    Code::Cancelled,
174                    format!("Contract could not be accepted. error={e:?}"),
175                )
176            })?;
177
178        let accept_dlc = serde_json::to_vec(&accept_dlc).map_err(|_| {
179            Status::new(Code::Cancelled, "Accept DLC is malformed to create bytes.")
180        })?;
181
182        Ok(Response::new(AcceptOfferResponse {
183            contract_id,
184            counter_party,
185            accept_dlc,
186        }))
187    }
188
189    #[tracing::instrument(skip(self, _request), name = "grpc_server")]
190    async fn new_address(
191        &self,
192        _request: Request<NewAddressRequest>,
193    ) -> Result<Response<NewAddressResponse>, Status> {
194        tracing::info!("Request for new wallet address");
195        let address = self
196            .node
197            .wallet
198            .new_external_address()
199            .await
200            .unwrap()
201            .to_string();
202        let response = NewAddressResponse { address };
203        Ok(Response::new(response))
204    }
205
206    #[tracing::instrument(skip(self, _request), name = "grpc_server")]
207    async fn list_offers(
208        &self,
209        _request: Request<ListOffersRequest>,
210    ) -> Result<Response<ListOffersResponse>, Status> {
211        tracing::info!("Request for offers to the node.");
212        let offers = self.node.storage.get_contract_offers().await.unwrap();
213        let offers: Vec<Vec<u8>> = offers
214            .iter()
215            .map(|offer| serde_json::to_vec(offer).unwrap())
216            .collect();
217
218        Ok(Response::new(ListOffersResponse { offers }))
219    }
220
221    #[tracing::instrument(skip(self, _request), name = "grpc_server")]
222    async fn wallet_balance(
223        &self,
224        _request: Request<WalletBalanceRequest>,
225    ) -> Result<Response<WalletBalanceResponse>, Status> {
226        tracing::info!("Request for wallet balance.");
227        let wallet_balance = self.node.balance().await.unwrap();
228
229        let response = WalletBalanceResponse {
230            confirmed: wallet_balance.confirmed.to_sat(),
231            foreign_unconfirmed: wallet_balance.foreign_unconfirmed.to_sat(),
232            change_unconfirmed: wallet_balance.change_unconfirmed.to_sat(),
233            contract_balance: wallet_balance.contract_pnl,
234        };
235        Ok(Response::new(response))
236    }
237
238    #[tracing::instrument(skip(self, _request), name = "grpc_server")]
239    async fn get_wallet_transactions(
240        &self,
241        _request: Request<GetWalletTransactionsRequest>,
242    ) -> Result<Response<GetWalletTransactionsResponse>, Status> {
243        tracing::info!("Request for all wallet transactions.");
244        let wallet_transactions = self.node.wallet.get_transactions().await.unwrap();
245        let transactions: Vec<Vec<u8>> = wallet_transactions
246            .iter()
247            .map(|t| serde_json::to_vec(&t).unwrap())
248            .collect();
249        Ok(Response::new(GetWalletTransactionsResponse {
250            transactions,
251        }))
252    }
253
254    #[tracing::instrument(skip(self, _request), name = "grpc_server")]
255    async fn list_utxos(
256        &self,
257        _request: Request<ListUtxosRequest>,
258    ) -> Result<Response<ListUtxosResponse>, Status> {
259        tracing::info!("Request to list all wallet utxos");
260        let utxos = self.node.wallet.list_utxos().await.unwrap();
261        let utxos: Vec<Vec<u8>> = utxos
262            .iter()
263            .map(|utxo| serde_json::to_vec(utxo).unwrap())
264            .collect();
265        Ok(Response::new(ListUtxosResponse { utxos }))
266    }
267
268    #[tracing::instrument(skip(self, _request), name = "grpc_server")]
269    async fn list_peers(
270        &self,
271        _request: Request<ListPeersRequest>,
272    ) -> Result<Response<ListPeersResponse>, Status> {
273        tracing::info!("List peers request");
274        let peers = vec![];
275
276        Ok(Response::new(ListPeersResponse { peers }))
277    }
278
279    #[tracing::instrument(skip(self, request), name = "grpc_server")]
280    async fn connect_peer(
281        &self,
282        request: Request<ConnectRequest>,
283    ) -> Result<Response<ConnectResponse>, Status> {
284        let ConnectRequest { pubkey, host } = request.into_inner();
285        let pubkey = PublicKey::from_str(&pubkey).unwrap();
286        self.node.transport.connect_outbound(pubkey, &host).await;
287        Ok(Response::new(ConnectResponse {}))
288    }
289
290    async fn list_oracles(
291        &self,
292        _request: Request<ListOraclesRequest>,
293    ) -> Result<Response<ListOraclesResponse>, Status> {
294        let pubkey = self.node.oracle.get_public_key().to_string();
295        let name = self.node.oracle.name();
296        Ok(Response::new(ListOraclesResponse { name, pubkey }))
297    }
298
299    async fn list_contracts(
300        &self,
301        _request: Request<ListContractsRequest>,
302    ) -> Result<Response<ListContractsResponse>, Status> {
303        let contracts = self
304            .node
305            .storage
306            .get_contracts()
307            .await
308            .map_err(|e| Status::new(Code::Cancelled, e.to_string()))?;
309        let contract_bytes: Vec<Vec<u8>> = contracts
310            .iter()
311            .map(|contract| serialize_contract(contract).unwrap())
312            .collect();
313        Ok(Response::new(ListContractsResponse {
314            contracts: contract_bytes,
315        }))
316    }
317
318    async fn send(&self, request: Request<SendRequest>) -> Result<Response<SendResponse>, Status> {
319        let SendRequest {
320            address,
321            amount,
322            fee_rate,
323        } = request.into_inner();
324        let address = Address::from_str(&address).unwrap().assume_checked();
325        let amount = Amount::from_sat(amount);
326        let fee_rate = match FeeRate::from_sat_per_vb(fee_rate) {
327            Some(f) => f,
328            None => return Err(Status::new(Code::InvalidArgument, "Invalid fee rate.")),
329        };
330        let txn = self
331            .node
332            .wallet
333            .send_to_address(address, amount, fee_rate)
334            .await;
335        if let Ok(tx) = txn {
336            Ok(Response::new(SendResponse {
337                txid: tx.to_string(),
338            }))
339        } else {
340            Err(Status::new(Code::Internal, "Transaction sending failed."))
341        }
342    }
343
344    async fn oracle_announcements(
345        &self,
346        request: Request<OracleAnnouncementsRequest>,
347    ) -> Result<Response<OracleAnnouncementsResponse>, Status> {
348        let OracleAnnouncementsRequest { event_id } = request.into_inner();
349        let oracle_announcement = self.node.oracle.get_announcement(&event_id).await.unwrap();
350        let announcement = serde_json::to_vec(&oracle_announcement).unwrap();
351        Ok(Response::new(OracleAnnouncementsResponse { announcement }))
352    }
353
354    async fn create_enum(
355        &self,
356        request: Request<CreateEnumRequest>,
357    ) -> Result<Response<CreateEnumResponse>, Status> {
358        let CreateEnumRequest { maturity, outcomes } = request.into_inner();
359        let announcement = self
360            .node
361            .oracle
362            .create_enum_event(outcomes, maturity)
363            .await
364            .unwrap();
365        let announcement = serde_json::to_vec(&announcement).unwrap();
366        Ok(Response::new(CreateEnumResponse { announcement }))
367    }
368
369    async fn wallet_sync(
370        &self,
371        _request: Request<WalletSyncRequest>,
372    ) -> Result<Response<WalletSyncResponse>, Status> {
373        self.node
374            .wallet
375            .sync()
376            .await
377            .map_err(|_| Status::new(Code::Aborted, "Did not sync wallet."))?;
378        Ok(Response::new(WalletSyncResponse {}))
379    }
380
381    async fn sync(&self, _request: Request<SyncRequest>) -> Result<Response<SyncResponse>, Status> {
382        if let Err(e) = self.node.manager.periodic_check(false).await {
383            tracing::error!("Error syncing: {:?}", e);
384            return Err(Status::new(Code::Internal, "Error syncing."));
385        };
386
387        if let Err(e) = self.node.wallet.sync().await {
388            tracing::error!("Error syncing wallet: {:?}", e);
389            return Err(Status::new(Code::Internal, "Error syncing wallet."));
390        };
391
392        Ok(Response::new(SyncResponse {}))
393    }
394}