ddk_node/
lib.rs

1pub mod cli_opts;
2pub mod command;
3pub mod ddkrpc;
4pub mod opts;
5mod seed;
6
7use bitcoin::secp256k1::PublicKey;
8use bitcoin::{Address, Amount, FeeRate, Network};
9use ddk::builder::Builder;
10use ddk::oracle::kormir::KormirOracleClient;
11use ddk::storage::postgres::PostgresStore;
12use ddk::transport::nostr::NostrDlc;
13use ddk::util::ser::serialize_contract;
14use ddk::DlcDevKit;
15use ddk::{Oracle, Transport};
16use ddk_manager::contract::contract_input::ContractInput;
17use ddk_manager::Oracle as DlcOracle;
18use ddk_manager::Storage as DlcStorage;
19use ddkrpc::ddk_rpc_server::{DdkRpc, DdkRpcServer};
20use ddkrpc::{
21    AcceptOfferRequest, AcceptOfferResponse, ConnectRequest, ConnectResponse, CreateEnumRequest,
22    CreateEnumResponse, GetWalletTransactionsRequest, GetWalletTransactionsResponse,
23    ListContractsRequest, ListContractsResponse, ListOffersRequest, ListOffersResponse,
24    ListOraclesRequest, ListOraclesResponse, ListPeersRequest, ListPeersResponse, ListUtxosRequest,
25    ListUtxosResponse, NewAddressRequest, NewAddressResponse, OracleAnnouncementsRequest,
26    OracleAnnouncementsResponse, SendOfferRequest, SendOfferResponse, SendRequest, SendResponse,
27    SyncRequest, SyncResponse, WalletBalanceRequest, WalletBalanceResponse, WalletSyncRequest,
28    WalletSyncResponse,
29};
30use ddkrpc::{InfoRequest, InfoResponse};
31use opts::NodeOpts;
32use std::str::FromStr;
33use std::sync::Arc;
34use tonic::transport::Server;
35use tonic::Request;
36use tonic::Response;
37use tonic::Status;
38use tonic::{async_trait, Code};
39
40type Ddk = DlcDevKit<NostrDlc, PostgresStore, KormirOracleClient>;
41
42#[derive(Clone)]
43pub struct DdkNode {
44    pub node: Arc<Ddk>,
45}
46
47impl DdkNode {
48    pub fn new(ddk: Ddk) -> Self {
49        Self {
50            node: Arc::new(ddk),
51        }
52    }
53
54    pub async fn serve(opts: NodeOpts) -> anyhow::Result<()> {
55        let storage_path = match opts.storage_dir {
56            Some(storage) => storage,
57            None => homedir::my_home()
58                .expect("Provide a directory for ddk.")
59                .unwrap()
60                .join(".ddk")
61                .join("default-ddk"),
62        };
63        let network = Network::from_str(&opts.network)?;
64        std::fs::create_dir_all(storage_path.clone())?;
65
66        let seed_bytes = crate::seed::xprv_from_path(storage_path.clone(), network)?;
67
68        tracing::info!("Starting DDK node.");
69
70        let transport = Arc::new(
71            NostrDlc::new(
72                &seed_bytes.private_key.secret_bytes(),
73                "wss://nostr.dlcdevkit.com",
74                network,
75            )
76            .await?,
77        );
78
79        let storage = Arc::new(PostgresStore::new(&opts.postgres_url, true, opts.name).await?);
80
81        // let oracle = Arc::new(P2PDOracleClient::new(&oracle_host).await?);
82        let oracle = Arc::new(KormirOracleClient::new(&opts.oracle_host, None).await?);
83
84        let mut builder = Builder::new();
85        builder.set_seed_bytes(seed_bytes.private_key.secret_bytes());
86        builder.set_esplora_host(opts.esplora_host);
87        builder.set_network(network);
88        builder.set_transport(transport.clone());
89        builder.set_storage(storage.clone());
90        builder.set_oracle(oracle.clone());
91
92        let ddk: Ddk = builder.finish().await?;
93
94        ddk.start()?;
95        let node = DdkNode::new(ddk);
96        let node_stop = node.node.clone();
97        let server = Server::builder()
98            .add_service(DdkRpcServer::new(node))
99            .serve_with_shutdown(opts.grpc_host.parse()?, async {
100                tokio::signal::ctrl_c()
101                    .await
102                    .expect("Failed to install Ctrl+C signal handler");
103                let _ = node_stop.stop();
104            });
105
106        server.await?;
107
108        Ok(())
109    }
110}
111
112#[async_trait]
113impl DdkRpc for DdkNode {
114    #[tracing::instrument(skip(self, _request), name = "grpc_server")]
115    async fn info(&self, _request: Request<InfoRequest>) -> Result<Response<InfoResponse>, Status> {
116        tracing::info!("Request for node info.");
117        let pubkey = self.node.transport.public_key().to_string();
118        let transport = self.node.transport.name();
119        let oracle = self.node.oracle.name();
120        let response = InfoResponse {
121            pubkey,
122            transport,
123            oracle,
124        };
125        Ok(Response::new(response))
126    }
127
128    #[tracing::instrument(skip(self, request), name = "grpc_server")]
129    async fn send_offer(
130        &self,
131        request: Request<SendOfferRequest>,
132    ) -> Result<Response<SendOfferResponse>, Status> {
133        tracing::info!("Request to send offer.");
134        let SendOfferRequest {
135            contract_input,
136            counter_party,
137        } = request.into_inner();
138        let contract_input: ContractInput =
139            serde_json::from_slice(&contract_input).expect("couldn't get bytes correct");
140        let mut oracle_announcements = Vec::new();
141        for info in &contract_input.contract_infos {
142            let announcement = self
143                .node
144                .oracle
145                .get_announcement(&info.oracles.event_id)
146                .await
147                .unwrap();
148            oracle_announcements.push(announcement)
149        }
150
151        let counter_party = PublicKey::from_str(&counter_party).expect("no public key");
152        let offer_msg = self
153            .node
154            .send_dlc_offer(&contract_input, counter_party, oracle_announcements)
155            .await
156            .map_err(|e| {
157                Status::new(
158                    Code::Cancelled,
159                    format!("Contract offer could not be sent to counterparty. error={e}"),
160                )
161            })?;
162
163        let offer_dlc =
164            serde_json::to_vec(&offer_msg).expect("OfferDlc could not be converted to vec.");
165        Ok(Response::new(SendOfferResponse { offer_dlc }))
166    }
167
168    #[tracing::instrument(skip(self, request), name = "grpc_server")]
169    async fn accept_offer(
170        &self,
171        request: Request<AcceptOfferRequest>,
172    ) -> Result<Response<AcceptOfferResponse>, Status> {
173        tracing::info!("Request to accept offer.");
174        let mut contract_id = [0u8; 32];
175        let contract_id_bytes = hex::decode(&request.into_inner().contract_id).unwrap();
176        contract_id.copy_from_slice(&contract_id_bytes);
177        let (contract_id, counter_party, accept_dlc) = self
178            .node
179            .accept_dlc_offer(contract_id)
180            .await
181            .map_err(|_| Status::new(Code::Cancelled, "Contract could not be accepted."))?;
182
183        let accept_dlc = serde_json::to_vec(&accept_dlc).map_err(|_| {
184            Status::new(Code::Cancelled, "Accept DLC is malformed to create bytes.")
185        })?;
186
187        Ok(Response::new(AcceptOfferResponse {
188            contract_id,
189            counter_party,
190            accept_dlc,
191        }))
192    }
193
194    #[tracing::instrument(skip(self, _request), name = "grpc_server")]
195    async fn new_address(
196        &self,
197        _request: Request<NewAddressRequest>,
198    ) -> Result<Response<NewAddressResponse>, Status> {
199        tracing::info!("Request for new wallet address");
200        let address = self
201            .node
202            .wallet
203            .new_external_address()
204            .await
205            .unwrap()
206            .to_string();
207        let response = NewAddressResponse { address };
208        Ok(Response::new(response))
209    }
210
211    #[tracing::instrument(skip(self, _request), name = "grpc_server")]
212    async fn list_offers(
213        &self,
214        _request: Request<ListOffersRequest>,
215    ) -> Result<Response<ListOffersResponse>, Status> {
216        tracing::info!("Request for offers to the node.");
217        let offers = self.node.storage.get_contract_offers().await.unwrap();
218        tracing::info!("Offers: {:?}", offers);
219        let offers: Vec<Vec<u8>> = offers
220            .iter()
221            .map(|offer| serde_json::to_vec(offer).unwrap())
222            .collect();
223
224        Ok(Response::new(ListOffersResponse { offers }))
225    }
226
227    #[tracing::instrument(skip(self, _request), name = "grpc_server")]
228    async fn wallet_balance(
229        &self,
230        _request: Request<WalletBalanceRequest>,
231    ) -> Result<Response<WalletBalanceResponse>, Status> {
232        tracing::info!("Request for wallet balance.");
233        let wallet_balance = self.node.balance().await.unwrap();
234
235        let response = WalletBalanceResponse {
236            confirmed: wallet_balance.confirmed.to_sat(),
237            foreign_unconfirmed: wallet_balance.foreign_unconfirmed.to_sat(),
238            change_unconfirmed: wallet_balance.change_unconfirmed.to_sat(),
239            contract_balance: wallet_balance.contract_pnl,
240        };
241        Ok(Response::new(response))
242    }
243
244    #[tracing::instrument(skip(self, _request), name = "grpc_server")]
245    async fn get_wallet_transactions(
246        &self,
247        _request: Request<GetWalletTransactionsRequest>,
248    ) -> Result<Response<GetWalletTransactionsResponse>, Status> {
249        tracing::info!("Request for all wallet transactions.");
250        let wallet_transactions = self.node.wallet.get_transactions().await.unwrap();
251        let transactions: Vec<Vec<u8>> = wallet_transactions
252            .iter()
253            .map(|t| serde_json::to_vec(&t).unwrap())
254            .collect();
255        Ok(Response::new(GetWalletTransactionsResponse {
256            transactions,
257        }))
258    }
259
260    #[tracing::instrument(skip(self, _request), name = "grpc_server")]
261    async fn list_utxos(
262        &self,
263        _request: Request<ListUtxosRequest>,
264    ) -> Result<Response<ListUtxosResponse>, Status> {
265        tracing::info!("Request to list all wallet utxos");
266        let utxos = self.node.wallet.list_utxos().await.unwrap();
267        let utxos: Vec<Vec<u8>> = utxos
268            .iter()
269            .map(|utxo| serde_json::to_vec(utxo).unwrap())
270            .collect();
271        Ok(Response::new(ListUtxosResponse { utxos }))
272    }
273
274    #[tracing::instrument(skip(self, _request), name = "grpc_server")]
275    async fn list_peers(
276        &self,
277        _request: Request<ListPeersRequest>,
278    ) -> Result<Response<ListPeersResponse>, Status> {
279        tracing::info!("List peers request");
280        let peers = vec![];
281
282        Ok(Response::new(ListPeersResponse { peers }))
283    }
284
285    #[tracing::instrument(skip(self, request), name = "grpc_server")]
286    async fn connect_peer(
287        &self,
288        request: Request<ConnectRequest>,
289    ) -> Result<Response<ConnectResponse>, Status> {
290        let ConnectRequest { pubkey, host } = request.into_inner();
291        let pubkey = PublicKey::from_str(&pubkey).unwrap();
292        self.node.transport.connect_outbound(pubkey, &host).await;
293        Ok(Response::new(ConnectResponse {}))
294    }
295
296    async fn list_oracles(
297        &self,
298        _request: Request<ListOraclesRequest>,
299    ) -> Result<Response<ListOraclesResponse>, Status> {
300        let pubkey = self.node.oracle.get_public_key().to_string();
301        let name = self.node.oracle.name();
302        Ok(Response::new(ListOraclesResponse { name, pubkey }))
303    }
304
305    async fn list_contracts(
306        &self,
307        _request: Request<ListContractsRequest>,
308    ) -> Result<Response<ListContractsResponse>, Status> {
309        let contracts = self
310            .node
311            .storage
312            .get_contracts()
313            .await
314            .map_err(|e| Status::new(Code::Cancelled, e.to_string()))?;
315        let contract_bytes: Vec<Vec<u8>> = contracts
316            .iter()
317            .map(|contract| serialize_contract(contract).unwrap())
318            .collect();
319        Ok(Response::new(ListContractsResponse {
320            contracts: contract_bytes,
321        }))
322    }
323
324    async fn send(&self, request: Request<SendRequest>) -> Result<Response<SendResponse>, Status> {
325        let SendRequest {
326            address,
327            amount,
328            fee_rate,
329        } = request.into_inner();
330        let address = Address::from_str(&address).unwrap().assume_checked();
331        let amount = Amount::from_sat(amount);
332        let fee_rate = match FeeRate::from_sat_per_vb(fee_rate) {
333            Some(f) => f,
334            None => return Err(Status::new(Code::InvalidArgument, "Invalid fee rate.")),
335        };
336        let txn = self
337            .node
338            .wallet
339            .send_to_address(address, amount, fee_rate)
340            .await;
341        if let Ok(tx) = txn {
342            Ok(Response::new(SendResponse {
343                txid: tx.to_string(),
344            }))
345        } else {
346            Err(Status::new(Code::Internal, "Transaction sending failed."))
347        }
348    }
349
350    async fn oracle_announcements(
351        &self,
352        request: Request<OracleAnnouncementsRequest>,
353    ) -> Result<Response<OracleAnnouncementsResponse>, Status> {
354        let OracleAnnouncementsRequest { event_id } = request.into_inner();
355        let oracle_announcement = self.node.oracle.get_announcement(&event_id).await.unwrap();
356        let announcement = serde_json::to_vec(&oracle_announcement).unwrap();
357        Ok(Response::new(OracleAnnouncementsResponse { announcement }))
358    }
359
360    async fn create_enum(
361        &self,
362        request: Request<CreateEnumRequest>,
363    ) -> Result<Response<CreateEnumResponse>, Status> {
364        let CreateEnumRequest { maturity, outcomes } = request.into_inner();
365        let announcement = self
366            .node
367            .oracle
368            .create_enum_event(outcomes, maturity)
369            .await
370            .unwrap();
371        let announcement = serde_json::to_vec(&announcement).unwrap();
372        Ok(Response::new(CreateEnumResponse { announcement }))
373    }
374
375    async fn wallet_sync(
376        &self,
377        _request: Request<WalletSyncRequest>,
378    ) -> Result<Response<WalletSyncResponse>, Status> {
379        self.node
380            .wallet
381            .sync()
382            .await
383            .map_err(|_| Status::new(Code::Aborted, "Did not sync wallet."))?;
384        Ok(Response::new(WalletSyncResponse {}))
385    }
386
387    async fn sync(&self, _request: Request<SyncRequest>) -> Result<Response<SyncResponse>, Status> {
388        if let Err(e) = self.node.manager.periodic_check(false).await {
389            tracing::error!("Error syncing: {:?}", e);
390            return Err(Status::new(Code::Internal, "Error syncing."));
391        };
392
393        if let Err(e) = self.node.wallet.sync().await {
394            tracing::error!("Error syncing wallet: {:?}", e);
395            return Err(Status::new(Code::Internal, "Error syncing wallet."));
396        };
397
398        Ok(Response::new(SyncResponse {}))
399    }
400}