ddk_node/
lib.rs

1pub mod cli_opts;
2pub mod command;
3pub mod ddkrpc;
4pub mod opts;
5mod seed;
6
7use bitcoin::secp256k1::PublicKey;
8use bitcoin::{Address, Amount, FeeRate, Network};
9use ddk::builder::{Builder, SeedConfig};
10use ddk::logger::{LogLevel, Logger};
11use ddk::oracle::kormir::KormirOracleClient;
12use ddk::storage::postgres::PostgresStore;
13use ddk::transport::nostr::NostrDlc;
14use ddk::util::ser::serialize_contract;
15use ddk::DlcDevKit;
16use ddk::{Oracle, Transport};
17use ddk_manager::contract::contract_input::ContractInput;
18use ddk_manager::Oracle as DlcOracle;
19use ddk_manager::Storage as DlcStorage;
20use ddkrpc::ddk_rpc_server::{DdkRpc, DdkRpcServer};
21use ddkrpc::{
22    AcceptOfferRequest, AcceptOfferResponse, ConnectRequest, ConnectResponse, CreateEnumRequest,
23    CreateEnumResponse, CreateNumericRequest, CreateNumericResponse, GetWalletTransactionsRequest,
24    GetWalletTransactionsResponse, ListContractsRequest, ListContractsResponse, ListOffersRequest,
25    ListOffersResponse, ListOraclesRequest, ListOraclesResponse, ListPeersRequest,
26    ListPeersResponse, ListUtxosRequest, ListUtxosResponse, NewAddressRequest, NewAddressResponse,
27    OracleAnnouncementsRequest, OracleAnnouncementsResponse, SendOfferRequest, SendOfferResponse,
28    SendRequest, SendResponse, SignRequest, SignResponse, SyncRequest, SyncResponse,
29    WalletBalanceRequest, WalletBalanceResponse, WalletSyncRequest, WalletSyncResponse,
30};
31use ddkrpc::{InfoRequest, InfoResponse};
32use opts::NodeOpts;
33use std::str::FromStr;
34use std::sync::Arc;
35use tonic::transport::Server;
36use tonic::Request;
37use tonic::Response;
38use tonic::Status;
39use tonic::{async_trait, Code};
40
41type Ddk = DlcDevKit<NostrDlc, PostgresStore, KormirOracleClient>;
42
43#[derive(Clone)]
44pub struct DdkNode {
45    pub node: Arc<Ddk>,
46}
47
48impl DdkNode {
49    pub fn new(ddk: Ddk) -> Self {
50        Self {
51            node: Arc::new(ddk),
52        }
53    }
54
55    pub async fn serve(opts: NodeOpts) -> anyhow::Result<()> {
56        let logger = Arc::new(Logger::console(
57            "console_logger".to_string(),
58            LogLevel::Info,
59        ));
60        let storage_path = match opts.storage_dir {
61            Some(storage) => storage,
62            None => homedir::my_home()
63                .expect("Provide a directory for ddk.")
64                .unwrap()
65                .join(".ddk")
66                .join("default-ddk"),
67        };
68        let network = Network::from_str(&opts.network)?;
69        std::fs::create_dir_all(storage_path.clone())?;
70
71        let seed_bytes = crate::seed::xprv_from_path(storage_path.clone())?;
72
73        let transport = Arc::new(
74            NostrDlc::new(
75                &seed_bytes,
76                "wss://nostr.dlcdevkit.com",
77                network,
78                logger.clone(),
79            )
80            .await?,
81        );
82
83        let storage = Arc::new(
84            PostgresStore::new(&opts.postgres_url, true, logger.clone(), opts.name).await?,
85        );
86
87        // let oracle = Arc::new(P2PDOracleClient::new(&oracle_host).await?);
88        let oracle =
89            Arc::new(KormirOracleClient::new(&opts.oracle_host, None, logger.clone()).await?);
90
91        let mut builder = Builder::new();
92        builder.set_seed_bytes(SeedConfig::Bytes(seed_bytes))?;
93        builder.set_esplora_host(opts.esplora_host);
94        builder.set_network(network);
95        builder.set_transport(transport.clone());
96        builder.set_storage(storage.clone());
97        builder.set_oracle(oracle.clone());
98        builder.set_logger(logger.clone());
99
100        let ddk: Ddk = builder.finish().await?;
101
102        ddk.start()?;
103        let node = DdkNode::new(ddk);
104        let node_stop = node.node.clone();
105        let server = Server::builder()
106            .add_service(DdkRpcServer::new(node))
107            .serve_with_shutdown(opts.grpc_host.parse()?, async {
108                tokio::signal::ctrl_c()
109                    .await
110                    .expect("Failed to install Ctrl+C signal handler");
111                let _ = node_stop.stop();
112            });
113
114        server.await?;
115
116        Ok(())
117    }
118}
119
120#[async_trait]
121impl DdkRpc for DdkNode {
122    #[tracing::instrument(skip(self, _request), name = "grpc_server")]
123    async fn info(&self, _request: Request<InfoRequest>) -> Result<Response<InfoResponse>, Status> {
124        tracing::info!("Request for node info.");
125        let pubkey = self.node.transport.public_key().to_string();
126        let transport = self.node.transport.name();
127        let oracle = self.node.oracle.name();
128        let response = InfoResponse {
129            pubkey,
130            transport,
131            oracle,
132        };
133        Ok(Response::new(response))
134    }
135
136    #[tracing::instrument(skip(self, request), name = "grpc_server")]
137    async fn send_offer(
138        &self,
139        request: Request<SendOfferRequest>,
140    ) -> Result<Response<SendOfferResponse>, Status> {
141        tracing::info!("Request to send offer.");
142        let SendOfferRequest {
143            contract_input,
144            counter_party,
145        } = request.into_inner();
146        let contract_input: ContractInput =
147            serde_json::from_slice(&contract_input).expect("couldn't get bytes correct");
148        let mut oracle_announcements = Vec::new();
149        for info in &contract_input.contract_infos {
150            let announcement = self
151                .node
152                .oracle
153                .get_announcement(&info.oracles.event_id)
154                .await
155                .unwrap();
156            oracle_announcements.push(announcement)
157        }
158
159        let counter_party = PublicKey::from_str(&counter_party).expect("no public key");
160        let offer_msg = self
161            .node
162            .send_dlc_offer(&contract_input, counter_party, oracle_announcements)
163            .await
164            .map_err(|e| {
165                Status::new(
166                    Code::Cancelled,
167                    format!("Contract offer could not be sent to counterparty. error={e}"),
168                )
169            })?;
170
171        let offer_dlc =
172            serde_json::to_vec(&offer_msg).expect("OfferDlc could not be converted to vec.");
173        Ok(Response::new(SendOfferResponse { offer_dlc }))
174    }
175
176    #[tracing::instrument(skip(self, request), name = "grpc_server")]
177    async fn accept_offer(
178        &self,
179        request: Request<AcceptOfferRequest>,
180    ) -> Result<Response<AcceptOfferResponse>, Status> {
181        tracing::info!("Request to accept offer.");
182        let mut contract_id = [0u8; 32];
183        let contract_id_bytes = hex::decode(&request.into_inner().contract_id).unwrap();
184        contract_id.copy_from_slice(&contract_id_bytes);
185        let (contract_id, counter_party, accept_dlc) =
186            self.node.accept_dlc_offer(contract_id).await.map_err(|e| {
187                Status::new(
188                    Code::Cancelled,
189                    format!("Contract could not be accepted. error={e:?}"),
190                )
191            })?;
192
193        let accept_dlc = serde_json::to_vec(&accept_dlc).map_err(|_| {
194            Status::new(Code::Cancelled, "Accept DLC is malformed to create bytes.")
195        })?;
196
197        Ok(Response::new(AcceptOfferResponse {
198            contract_id,
199            counter_party,
200            accept_dlc,
201        }))
202    }
203
204    #[tracing::instrument(skip(self, _request), name = "grpc_server")]
205    async fn new_address(
206        &self,
207        _request: Request<NewAddressRequest>,
208    ) -> Result<Response<NewAddressResponse>, Status> {
209        tracing::info!("Request for new wallet address");
210        let address = self
211            .node
212            .wallet
213            .new_external_address()
214            .await
215            .unwrap()
216            .to_string();
217        let response = NewAddressResponse { address };
218        Ok(Response::new(response))
219    }
220
221    #[tracing::instrument(skip(self, _request), name = "grpc_server")]
222    async fn list_offers(
223        &self,
224        _request: Request<ListOffersRequest>,
225    ) -> Result<Response<ListOffersResponse>, Status> {
226        tracing::info!("Request for offers to the node.");
227        let offers = self.node.storage.get_contract_offers().await.unwrap();
228        let offers: Vec<Vec<u8>> = offers
229            .iter()
230            .map(|offer| serde_json::to_vec(offer).unwrap())
231            .collect();
232
233        Ok(Response::new(ListOffersResponse { offers }))
234    }
235
236    #[tracing::instrument(skip(self, _request), name = "grpc_server")]
237    async fn wallet_balance(
238        &self,
239        _request: Request<WalletBalanceRequest>,
240    ) -> Result<Response<WalletBalanceResponse>, Status> {
241        tracing::info!("Request for wallet balance.");
242        let wallet_balance = self.node.balance().await.unwrap();
243
244        let response = WalletBalanceResponse {
245            confirmed: wallet_balance.confirmed.to_sat(),
246            foreign_unconfirmed: wallet_balance.foreign_unconfirmed.to_sat(),
247            change_unconfirmed: wallet_balance.change_unconfirmed.to_sat(),
248            contract_balance: wallet_balance.contract_pnl,
249        };
250        Ok(Response::new(response))
251    }
252
253    #[tracing::instrument(skip(self, _request), name = "grpc_server")]
254    async fn get_wallet_transactions(
255        &self,
256        _request: Request<GetWalletTransactionsRequest>,
257    ) -> Result<Response<GetWalletTransactionsResponse>, Status> {
258        tracing::info!("Request for all wallet transactions.");
259        let wallet_transactions = self.node.wallet.get_transactions().await.unwrap();
260        let transactions: Vec<Vec<u8>> = wallet_transactions
261            .iter()
262            .map(|t| serde_json::to_vec(&t).unwrap())
263            .collect();
264        Ok(Response::new(GetWalletTransactionsResponse {
265            transactions,
266        }))
267    }
268
269    #[tracing::instrument(skip(self, _request), name = "grpc_server")]
270    async fn list_utxos(
271        &self,
272        _request: Request<ListUtxosRequest>,
273    ) -> Result<Response<ListUtxosResponse>, Status> {
274        tracing::info!("Request to list all wallet utxos");
275        let utxos = self.node.wallet.list_utxos().await.unwrap();
276        let utxos: Vec<Vec<u8>> = utxos
277            .iter()
278            .map(|utxo| serde_json::to_vec(utxo).unwrap())
279            .collect();
280        Ok(Response::new(ListUtxosResponse { utxos }))
281    }
282
283    #[tracing::instrument(skip(self, _request), name = "grpc_server")]
284    async fn list_peers(
285        &self,
286        _request: Request<ListPeersRequest>,
287    ) -> Result<Response<ListPeersResponse>, Status> {
288        tracing::info!("List peers request");
289        let peers = vec![];
290
291        Ok(Response::new(ListPeersResponse { peers }))
292    }
293
294    #[tracing::instrument(skip(self, request), name = "grpc_server")]
295    async fn connect_peer(
296        &self,
297        request: Request<ConnectRequest>,
298    ) -> Result<Response<ConnectResponse>, Status> {
299        let ConnectRequest { pubkey, host } = request.into_inner();
300        let pubkey = PublicKey::from_str(&pubkey).unwrap();
301        self.node.transport.connect_outbound(pubkey, &host).await;
302        Ok(Response::new(ConnectResponse {}))
303    }
304
305    async fn list_oracles(
306        &self,
307        _request: Request<ListOraclesRequest>,
308    ) -> Result<Response<ListOraclesResponse>, Status> {
309        let pubkey = self.node.oracle.get_public_key().to_string();
310        let name = self.node.oracle.name();
311        Ok(Response::new(ListOraclesResponse { name, pubkey }))
312    }
313
314    async fn list_contracts(
315        &self,
316        _request: Request<ListContractsRequest>,
317    ) -> Result<Response<ListContractsResponse>, Status> {
318        let contracts = self
319            .node
320            .storage
321            .get_contracts()
322            .await
323            .map_err(|e| Status::new(Code::Cancelled, e.to_string()))?;
324        let contract_bytes: Vec<Vec<u8>> = contracts
325            .iter()
326            .map(|contract| serialize_contract(contract).unwrap())
327            .collect();
328        Ok(Response::new(ListContractsResponse {
329            contracts: contract_bytes,
330        }))
331    }
332
333    async fn send(&self, request: Request<SendRequest>) -> Result<Response<SendResponse>, Status> {
334        let SendRequest {
335            address,
336            amount,
337            fee_rate,
338        } = request.into_inner();
339        let address = Address::from_str(&address).unwrap().assume_checked();
340        let amount = Amount::from_sat(amount);
341        let fee_rate = match FeeRate::from_sat_per_vb(fee_rate) {
342            Some(f) => f,
343            None => return Err(Status::new(Code::InvalidArgument, "Invalid fee rate.")),
344        };
345        let txn = self
346            .node
347            .wallet
348            .send_to_address(address, amount, fee_rate)
349            .await;
350        if let Ok(tx) = txn {
351            Ok(Response::new(SendResponse {
352                txid: tx.to_string(),
353            }))
354        } else {
355            Err(Status::new(Code::Internal, "Transaction sending failed."))
356        }
357    }
358
359    async fn oracle_announcements(
360        &self,
361        request: Request<OracleAnnouncementsRequest>,
362    ) -> Result<Response<OracleAnnouncementsResponse>, Status> {
363        let OracleAnnouncementsRequest { event_id } = request.into_inner();
364        let oracle_announcement = self.node.oracle.get_announcement(&event_id).await.unwrap();
365        let announcement = serde_json::to_vec(&oracle_announcement).unwrap();
366        Ok(Response::new(OracleAnnouncementsResponse { announcement }))
367    }
368
369    async fn create_enum(
370        &self,
371        request: Request<CreateEnumRequest>,
372    ) -> Result<Response<CreateEnumResponse>, Status> {
373        let CreateEnumRequest { maturity, outcomes } = request.into_inner();
374        let announcement = self
375            .node
376            .oracle
377            .create_enum_event(outcomes, maturity)
378            .await
379            .unwrap();
380        let announcement = serde_json::to_vec(&announcement).unwrap();
381        Ok(Response::new(CreateEnumResponse { announcement }))
382    }
383
384    async fn wallet_sync(
385        &self,
386        _request: Request<WalletSyncRequest>,
387    ) -> Result<Response<WalletSyncResponse>, Status> {
388        self.node
389            .wallet
390            .sync()
391            .await
392            .map_err(|_| Status::new(Code::Aborted, "Did not sync wallet."))?;
393        Ok(Response::new(WalletSyncResponse {}))
394    }
395
396    async fn sync(&self, _request: Request<SyncRequest>) -> Result<Response<SyncResponse>, Status> {
397        if let Err(e) = self.node.manager.periodic_check(false).await {
398            tracing::error!("Error syncing: {:?}", e);
399            return Err(Status::new(Code::Internal, "Error syncing."));
400        };
401
402        if let Err(e) = self.node.wallet.sync().await {
403            tracing::error!("Error syncing wallet: {:?}", e);
404            return Err(Status::new(Code::Internal, "Error syncing wallet."));
405        };
406
407        Ok(Response::new(SyncResponse {}))
408    }
409
410    async fn create_numeric(
411        &self,
412        request: Request<CreateNumericRequest>,
413    ) -> Result<Response<CreateNumericResponse>, Status> {
414        let CreateNumericRequest {
415            maturity,
416            nb_digits,
417        } = request.into_inner();
418        let announcement = self
419            .node
420            .oracle
421            .create_numeric_event(
422                Some(nb_digits as u16), // Number of digits
423                None,                   // Default is_signed to false
424                None,                   // Default precision to 0
425                "unit".to_string(),     // Default unit
426                maturity,               // Maturity timestamp
427            )
428            .await
429            .map_err(|e| Status::internal(e.to_string()))?;
430
431        let announcement_bytes =
432            serde_json::to_vec(&announcement).map_err(|e| Status::internal(e.to_string()))?;
433
434        Ok(Response::new(CreateNumericResponse {
435            announcement: announcement_bytes,
436        }))
437    }
438
439    async fn sign_announcement(
440        &self,
441        request: Request<SignRequest>,
442    ) -> Result<Response<SignResponse>, Status> {
443        let SignRequest { event_id, outcome } = request.into_inner();
444        let attestation = match outcome {
445            Some(ddkrpc::sign_request::Outcome::EnumOutcome(outcome)) => {
446                self.node.oracle.sign_enum_event(event_id, outcome).await
447            }
448            Some(ddkrpc::sign_request::Outcome::NumericOutcome(outcome)) => {
449                self.node.oracle.sign_numeric_event(event_id, outcome).await
450            }
451            None => return Err(Status::invalid_argument("Outcome must be specified")),
452        }
453        .map_err(|e| Status::internal(e.to_string()))?;
454
455        let signature =
456            serde_json::to_vec(&attestation).map_err(|e| Status::internal(e.to_string()))?;
457
458        Ok(Response::new(SignResponse { signature }))
459    }
460}