Skip to main content

ddk_node/
lib.rs

1pub mod cli_opts;
2pub mod command;
3pub mod ddkrpc;
4pub mod opts;
5mod seed;
6
7use bitcoin::secp256k1::PublicKey;
8use bitcoin::{Address, Amount, FeeRate, Network};
9use ddk::builder::{Builder, SeedConfig};
10use ddk::logger::{LogLevel, Logger};
11use ddk::oracle::kormir::KormirOracleClient;
12use ddk::storage::postgres::PostgresStore;
13use ddk::transport::nostr::NostrDlc;
14use ddk::util::ser::serialize_contract;
15use ddk::DlcDevKit;
16use ddk::{Oracle, Transport};
17use ddk_manager::contract::contract_input::ContractInput;
18use ddk_manager::Oracle as DlcOracle;
19use ddk_manager::Storage as DlcStorage;
20use ddkrpc::ddk_rpc_server::{DdkRpc, DdkRpcServer};
21use ddkrpc::{
22    AcceptOfferRequest, AcceptOfferResponse, ConnectRequest, ConnectResponse, CreateEnumRequest,
23    CreateEnumResponse, CreateNumericRequest, CreateNumericResponse, GetWalletTransactionsRequest,
24    GetWalletTransactionsResponse, ListContractsRequest, ListContractsResponse, ListOffersRequest,
25    ListOffersResponse, ListOraclesRequest, ListOraclesResponse, ListPeersRequest,
26    ListPeersResponse, ListUtxosRequest, ListUtxosResponse, NewAddressRequest, NewAddressResponse,
27    OracleAnnouncementsRequest, OracleAnnouncementsResponse, SendOfferRequest, SendOfferResponse,
28    SendRequest, SendResponse, SignRequest, SignResponse, SyncRequest, SyncResponse,
29    WalletBalanceRequest, WalletBalanceResponse, WalletSyncRequest, WalletSyncResponse,
30};
31use ddkrpc::{InfoRequest, InfoResponse};
32use opts::NodeOpts;
33use std::str::FromStr;
34use std::sync::Arc;
35use tonic::transport::Server;
36use tonic::Request;
37use tonic::Response;
38use tonic::Status;
39use tonic::{async_trait, Code};
40
41type Ddk = DlcDevKit<NostrDlc, PostgresStore, KormirOracleClient>;
42
43#[derive(Clone)]
44pub struct DdkNode {
45    pub node: Arc<Ddk>,
46}
47
48impl DdkNode {
49    pub fn new(ddk: Ddk) -> Self {
50        Self {
51            node: Arc::new(ddk),
52        }
53    }
54
55    pub async fn serve(opts: NodeOpts) -> anyhow::Result<()> {
56        let logger = Arc::new(Logger::console(
57            "console_logger".to_string(),
58            LogLevel::from(opts.log),
59        ));
60        let storage_path = match opts.storage_dir {
61            Some(storage) => storage,
62            None => homedir::my_home()
63                .expect("Provide a directory for ddk.")
64                .unwrap()
65                .join(".ddk")
66                .join("default-ddk"),
67        };
68        let network = Network::from_str(&opts.network)?;
69        std::fs::create_dir_all(storage_path.clone())?;
70
71        let seed_bytes = crate::seed::xprv_from_path(storage_path.clone())?;
72
73        let transport = Arc::new(
74            NostrDlc::new(
75                &seed_bytes,
76                "wss://nostr.dlcdevkit.com",
77                network,
78                logger.clone(),
79            )
80            .await?,
81        );
82
83        let storage = Arc::new(
84            PostgresStore::new(&opts.postgres_url, true, logger.clone(), opts.name).await?,
85        );
86
87        // let oracle = Arc::new(P2PDOracleClient::new(&oracle_host).await?);
88        let oracle =
89            Arc::new(KormirOracleClient::new(&opts.oracle_host, None, logger.clone()).await?);
90
91        let mut builder = Builder::new();
92        builder.set_seed_bytes(SeedConfig::Bytes(seed_bytes))?;
93        builder.set_esplora_host(opts.esplora_host);
94        builder.set_network(network);
95        builder.set_transport(transport.clone());
96        builder.set_storage(storage.clone());
97        builder.set_oracle(oracle.clone());
98        builder.set_logger(logger.clone());
99
100        if let Some(endpoint) = opts.zmq_blockhash_endpoint.filter(|e| !e.is_empty()) {
101            builder.set_zmq_blockhash_endpoint(endpoint);
102        }
103
104        let ddk: Ddk = builder.finish().await?;
105
106        ddk.start()?;
107        let node = DdkNode::new(ddk);
108        let node_stop = node.node.clone();
109        let server = Server::builder()
110            .add_service(DdkRpcServer::new(node))
111            .serve_with_shutdown(opts.grpc_host.parse()?, async {
112                tokio::signal::ctrl_c()
113                    .await
114                    .expect("Failed to install Ctrl+C signal handler");
115                let _ = node_stop.stop();
116            });
117
118        server.await?;
119
120        Ok(())
121    }
122}
123
124#[async_trait]
125impl DdkRpc for DdkNode {
126    #[tracing::instrument(skip(self, _request), name = "grpc_server")]
127    async fn info(&self, _request: Request<InfoRequest>) -> Result<Response<InfoResponse>, Status> {
128        tracing::info!("Request for node info.");
129        let pubkey = self.node.transport.public_key().to_string();
130        let transport = self.node.transport.name();
131        let oracle = self.node.oracle.name();
132        let response = InfoResponse {
133            pubkey,
134            transport,
135            oracle,
136        };
137        Ok(Response::new(response))
138    }
139
140    #[tracing::instrument(skip(self, request), name = "grpc_server")]
141    async fn send_offer(
142        &self,
143        request: Request<SendOfferRequest>,
144    ) -> Result<Response<SendOfferResponse>, Status> {
145        tracing::info!("Request to send offer.");
146        let SendOfferRequest {
147            contract_input,
148            counter_party,
149        } = request.into_inner();
150        let contract_input: ContractInput =
151            serde_json::from_slice(&contract_input).expect("couldn't get bytes correct");
152        let mut oracle_announcements = Vec::new();
153        for info in &contract_input.contract_infos {
154            let announcement = self
155                .node
156                .oracle
157                .get_announcement(&info.oracles.event_id)
158                .await
159                .unwrap();
160            oracle_announcements.push(announcement)
161        }
162
163        let counter_party = PublicKey::from_str(&counter_party).expect("no public key");
164        let offer_msg = self
165            .node
166            .send_dlc_offer(&contract_input, counter_party, oracle_announcements)
167            .await
168            .map_err(|e| {
169                Status::new(
170                    Code::Cancelled,
171                    format!("Contract offer could not be sent to counterparty. error={e}"),
172                )
173            })?;
174
175        let offer_dlc =
176            serde_json::to_vec(&offer_msg).expect("OfferDlc could not be converted to vec.");
177        Ok(Response::new(SendOfferResponse { offer_dlc }))
178    }
179
180    #[tracing::instrument(skip(self, request), name = "grpc_server")]
181    async fn accept_offer(
182        &self,
183        request: Request<AcceptOfferRequest>,
184    ) -> Result<Response<AcceptOfferResponse>, Status> {
185        tracing::info!("Request to accept offer.");
186        let mut contract_id = [0u8; 32];
187        let contract_id_bytes = hex::decode(&request.into_inner().contract_id).unwrap();
188        contract_id.copy_from_slice(&contract_id_bytes);
189        let (contract_id, counter_party, accept_dlc) =
190            self.node.accept_dlc_offer(contract_id).await.map_err(|e| {
191                Status::new(
192                    Code::Cancelled,
193                    format!("Contract could not be accepted. error={e:?}"),
194                )
195            })?;
196
197        let accept_dlc = serde_json::to_vec(&accept_dlc).map_err(|_| {
198            Status::new(Code::Cancelled, "Accept DLC is malformed to create bytes.")
199        })?;
200
201        Ok(Response::new(AcceptOfferResponse {
202            contract_id,
203            counter_party,
204            accept_dlc,
205        }))
206    }
207
208    #[tracing::instrument(skip(self, _request), name = "grpc_server")]
209    async fn new_address(
210        &self,
211        _request: Request<NewAddressRequest>,
212    ) -> Result<Response<NewAddressResponse>, Status> {
213        tracing::info!("Request for new wallet address");
214        let address = self
215            .node
216            .wallet
217            .new_external_address()
218            .await
219            .unwrap()
220            .to_string();
221        let response = NewAddressResponse { address };
222        Ok(Response::new(response))
223    }
224
225    #[tracing::instrument(skip(self, _request), name = "grpc_server")]
226    async fn list_offers(
227        &self,
228        _request: Request<ListOffersRequest>,
229    ) -> Result<Response<ListOffersResponse>, Status> {
230        tracing::info!("Request for offers to the node.");
231        let offers = self.node.storage.get_contract_offers().await.unwrap();
232        let offers: Vec<Vec<u8>> = offers
233            .iter()
234            .map(|offer| serde_json::to_vec(offer).unwrap())
235            .collect();
236
237        Ok(Response::new(ListOffersResponse { offers }))
238    }
239
240    #[tracing::instrument(skip(self, _request), name = "grpc_server")]
241    async fn wallet_balance(
242        &self,
243        _request: Request<WalletBalanceRequest>,
244    ) -> Result<Response<WalletBalanceResponse>, Status> {
245        tracing::info!("Request for wallet balance.");
246        let wallet_balance = self.node.balance().await.unwrap();
247
248        let response = WalletBalanceResponse {
249            confirmed: wallet_balance.confirmed.to_sat(),
250            foreign_unconfirmed: wallet_balance.foreign_unconfirmed.to_sat(),
251            change_unconfirmed: wallet_balance.change_unconfirmed.to_sat(),
252            contract_balance: wallet_balance.contract_pnl,
253        };
254        Ok(Response::new(response))
255    }
256
257    #[tracing::instrument(skip(self, _request), name = "grpc_server")]
258    async fn get_wallet_transactions(
259        &self,
260        _request: Request<GetWalletTransactionsRequest>,
261    ) -> Result<Response<GetWalletTransactionsResponse>, Status> {
262        tracing::info!("Request for all wallet transactions.");
263        let wallet_transactions = self.node.wallet.get_transactions().await.unwrap();
264        let transactions: Vec<Vec<u8>> = wallet_transactions
265            .iter()
266            .map(|t| serde_json::to_vec(&t).unwrap())
267            .collect();
268        Ok(Response::new(GetWalletTransactionsResponse {
269            transactions,
270        }))
271    }
272
273    #[tracing::instrument(skip(self, _request), name = "grpc_server")]
274    async fn list_utxos(
275        &self,
276        _request: Request<ListUtxosRequest>,
277    ) -> Result<Response<ListUtxosResponse>, Status> {
278        tracing::info!("Request to list all wallet utxos");
279        let utxos = self.node.wallet.list_utxos().await.unwrap();
280        let utxos: Vec<Vec<u8>> = utxos
281            .iter()
282            .map(|utxo| serde_json::to_vec(utxo).unwrap())
283            .collect();
284        Ok(Response::new(ListUtxosResponse { utxos }))
285    }
286
287    #[tracing::instrument(skip(self, _request), name = "grpc_server")]
288    async fn list_peers(
289        &self,
290        _request: Request<ListPeersRequest>,
291    ) -> Result<Response<ListPeersResponse>, Status> {
292        tracing::info!("List peers request");
293        let peers = vec![];
294
295        Ok(Response::new(ListPeersResponse { peers }))
296    }
297
298    #[tracing::instrument(skip(self, request), name = "grpc_server")]
299    async fn connect_peer(
300        &self,
301        request: Request<ConnectRequest>,
302    ) -> Result<Response<ConnectResponse>, Status> {
303        let ConnectRequest { pubkey, host } = request.into_inner();
304        let pubkey = PublicKey::from_str(&pubkey).unwrap();
305        self.node.transport.connect_outbound(pubkey, &host).await;
306        Ok(Response::new(ConnectResponse {}))
307    }
308
309    async fn list_oracles(
310        &self,
311        _request: Request<ListOraclesRequest>,
312    ) -> Result<Response<ListOraclesResponse>, Status> {
313        let pubkey = self.node.oracle.get_public_key().to_string();
314        let name = self.node.oracle.name();
315        Ok(Response::new(ListOraclesResponse { name, pubkey }))
316    }
317
318    async fn list_contracts(
319        &self,
320        _request: Request<ListContractsRequest>,
321    ) -> Result<Response<ListContractsResponse>, Status> {
322        let contracts = self
323            .node
324            .storage
325            .get_contracts()
326            .await
327            .map_err(|e| Status::new(Code::Cancelled, e.to_string()))?;
328        let contract_bytes: Vec<Vec<u8>> = contracts
329            .iter()
330            .map(|contract| serialize_contract(contract).unwrap())
331            .collect();
332        Ok(Response::new(ListContractsResponse {
333            contracts: contract_bytes,
334        }))
335    }
336
337    async fn send(&self, request: Request<SendRequest>) -> Result<Response<SendResponse>, Status> {
338        let SendRequest {
339            address,
340            amount,
341            fee_rate,
342        } = request.into_inner();
343        let address = Address::from_str(&address).unwrap().assume_checked();
344        let amount = Amount::from_sat(amount);
345        let fee_rate = match FeeRate::from_sat_per_vb(fee_rate) {
346            Some(f) => f,
347            None => return Err(Status::new(Code::InvalidArgument, "Invalid fee rate.")),
348        };
349        let txn = self
350            .node
351            .wallet
352            .send_to_address(address, amount, fee_rate)
353            .await;
354        if let Ok(tx) = txn {
355            Ok(Response::new(SendResponse {
356                txid: tx.to_string(),
357            }))
358        } else {
359            Err(Status::new(Code::Internal, "Transaction sending failed."))
360        }
361    }
362
363    async fn oracle_announcements(
364        &self,
365        request: Request<OracleAnnouncementsRequest>,
366    ) -> Result<Response<OracleAnnouncementsResponse>, Status> {
367        let OracleAnnouncementsRequest { event_id } = request.into_inner();
368        let oracle_announcement = self.node.oracle.get_announcement(&event_id).await.unwrap();
369        let announcement = serde_json::to_vec(&oracle_announcement).unwrap();
370        Ok(Response::new(OracleAnnouncementsResponse { announcement }))
371    }
372
373    async fn create_enum(
374        &self,
375        request: Request<CreateEnumRequest>,
376    ) -> Result<Response<CreateEnumResponse>, Status> {
377        let CreateEnumRequest { maturity, outcomes } = request.into_inner();
378        let announcement = self
379            .node
380            .oracle
381            .create_enum_event(outcomes, maturity)
382            .await
383            .unwrap();
384        let announcement = serde_json::to_vec(&announcement).unwrap();
385        Ok(Response::new(CreateEnumResponse { announcement }))
386    }
387
388    async fn wallet_sync(
389        &self,
390        _request: Request<WalletSyncRequest>,
391    ) -> Result<Response<WalletSyncResponse>, Status> {
392        self.node
393            .wallet
394            .sync()
395            .await
396            .map_err(|_| Status::new(Code::Aborted, "Did not sync wallet."))?;
397        Ok(Response::new(WalletSyncResponse {}))
398    }
399
400    async fn sync(&self, _request: Request<SyncRequest>) -> Result<Response<SyncResponse>, Status> {
401        if let Err(e) = self.node.manager.periodic_check(false).await {
402            tracing::error!("Error syncing: {:?}", e);
403            return Err(Status::new(Code::Internal, "Error syncing."));
404        };
405
406        if let Err(e) = self.node.wallet.sync().await {
407            tracing::error!("Error syncing wallet: {:?}", e);
408            return Err(Status::new(Code::Internal, "Error syncing wallet."));
409        };
410
411        Ok(Response::new(SyncResponse {}))
412    }
413
414    async fn create_numeric(
415        &self,
416        request: Request<CreateNumericRequest>,
417    ) -> Result<Response<CreateNumericResponse>, Status> {
418        let CreateNumericRequest {
419            maturity,
420            nb_digits,
421        } = request.into_inner();
422        let announcement = self
423            .node
424            .oracle
425            .create_numeric_event(
426                Some(nb_digits as u16), // Number of digits
427                None,                   // Default is_signed to false
428                None,                   // Default precision to 0
429                "unit".to_string(),     // Default unit
430                maturity,               // Maturity timestamp
431            )
432            .await
433            .map_err(|e| Status::internal(e.to_string()))?;
434
435        let announcement_bytes =
436            serde_json::to_vec(&announcement).map_err(|e| Status::internal(e.to_string()))?;
437
438        Ok(Response::new(CreateNumericResponse {
439            announcement: announcement_bytes,
440        }))
441    }
442
443    async fn sign_announcement(
444        &self,
445        request: Request<SignRequest>,
446    ) -> Result<Response<SignResponse>, Status> {
447        let SignRequest { event_id, outcome } = request.into_inner();
448        let attestation = match outcome {
449            Some(ddkrpc::sign_request::Outcome::EnumOutcome(outcome)) => {
450                self.node.oracle.sign_enum_event(event_id, outcome).await
451            }
452            Some(ddkrpc::sign_request::Outcome::NumericOutcome(outcome)) => {
453                self.node.oracle.sign_numeric_event(event_id, outcome).await
454            }
455            None => return Err(Status::invalid_argument("Outcome must be specified")),
456        }
457        .map_err(|e| Status::internal(e.to_string()))?;
458
459        let signature =
460            serde_json::to_vec(&attestation).map_err(|e| Status::internal(e.to_string()))?;
461
462        Ok(Response::new(SignResponse { signature }))
463    }
464}