1pub mod cli_opts;
2pub mod command;
3pub mod ddkrpc;
4pub mod opts;
5mod seed;
6
7use bitcoin::secp256k1::PublicKey;
8use bitcoin::{Address, Amount, FeeRate, Network};
9use ddk::builder::{Builder, SeedConfig};
10use ddk::logger::{LogLevel, Logger};
11use ddk::oracle::kormir::KormirOracleClient;
12use ddk::storage::postgres::PostgresStore;
13use ddk::transport::nostr::NostrDlc;
14use ddk::util::ser::serialize_contract;
15use ddk::DlcDevKit;
16use ddk::{Oracle, Transport};
17use ddk_manager::contract::contract_input::ContractInput;
18use ddk_manager::Oracle as DlcOracle;
19use ddk_manager::Storage as DlcStorage;
20use ddkrpc::ddk_rpc_server::{DdkRpc, DdkRpcServer};
21use ddkrpc::{
22 AcceptOfferRequest, AcceptOfferResponse, ConnectRequest, ConnectResponse, CreateEnumRequest,
23 CreateEnumResponse, CreateNumericRequest, CreateNumericResponse, GetWalletTransactionsRequest,
24 GetWalletTransactionsResponse, ListContractsRequest, ListContractsResponse, ListOffersRequest,
25 ListOffersResponse, ListOraclesRequest, ListOraclesResponse, ListPeersRequest,
26 ListPeersResponse, ListUtxosRequest, ListUtxosResponse, NewAddressRequest, NewAddressResponse,
27 OracleAnnouncementsRequest, OracleAnnouncementsResponse, SendOfferRequest, SendOfferResponse,
28 SendRequest, SendResponse, SignRequest, SignResponse, SyncRequest, SyncResponse,
29 WalletBalanceRequest, WalletBalanceResponse, WalletSyncRequest, WalletSyncResponse,
30};
31use ddkrpc::{InfoRequest, InfoResponse};
32use opts::NodeOpts;
33use std::str::FromStr;
34use std::sync::Arc;
35use tonic::transport::Server;
36use tonic::Request;
37use tonic::Response;
38use tonic::Status;
39use tonic::{async_trait, Code};
40
41type Ddk = DlcDevKit<NostrDlc, PostgresStore, KormirOracleClient>;
42
43#[derive(Clone)]
44pub struct DdkNode {
45 pub node: Arc<Ddk>,
46}
47
48impl DdkNode {
49 pub fn new(ddk: Ddk) -> Self {
50 Self {
51 node: Arc::new(ddk),
52 }
53 }
54
55 pub async fn serve(opts: NodeOpts) -> anyhow::Result<()> {
56 let logger = Arc::new(Logger::console(
57 "console_logger".to_string(),
58 LogLevel::from(opts.log),
59 ));
60 let storage_path = match opts.storage_dir {
61 Some(storage) => storage,
62 None => homedir::my_home()
63 .expect("Provide a directory for ddk.")
64 .unwrap()
65 .join(".ddk")
66 .join("default-ddk"),
67 };
68 let network = Network::from_str(&opts.network)?;
69 std::fs::create_dir_all(storage_path.clone())?;
70
71 let seed_bytes = crate::seed::xprv_from_path(storage_path.clone())?;
72
73 let transport = Arc::new(
74 NostrDlc::new(
75 &seed_bytes,
76 "wss://nostr.dlcdevkit.com",
77 network,
78 logger.clone(),
79 )
80 .await?,
81 );
82
83 let storage = Arc::new(
84 PostgresStore::new(&opts.postgres_url, true, logger.clone(), opts.name).await?,
85 );
86
87 let oracle =
89 Arc::new(KormirOracleClient::new(&opts.oracle_host, None, logger.clone()).await?);
90
91 let mut builder = Builder::new();
92 builder.set_seed_bytes(SeedConfig::Bytes(seed_bytes))?;
93 builder.set_esplora_host(opts.esplora_host);
94 builder.set_network(network);
95 builder.set_transport(transport.clone());
96 builder.set_storage(storage.clone());
97 builder.set_oracle(oracle.clone());
98 builder.set_logger(logger.clone());
99
100 if let Some(endpoint) = opts.zmq_blockhash_endpoint.filter(|e| !e.is_empty()) {
101 builder.set_zmq_blockhash_endpoint(endpoint);
102 }
103
104 let ddk: Ddk = builder.finish().await?;
105
106 ddk.start()?;
107 let node = DdkNode::new(ddk);
108 let node_stop = node.node.clone();
109 let server = Server::builder()
110 .add_service(DdkRpcServer::new(node))
111 .serve_with_shutdown(opts.grpc_host.parse()?, async {
112 tokio::signal::ctrl_c()
113 .await
114 .expect("Failed to install Ctrl+C signal handler");
115 let _ = node_stop.stop();
116 });
117
118 server.await?;
119
120 Ok(())
121 }
122}
123
124#[async_trait]
125impl DdkRpc for DdkNode {
126 #[tracing::instrument(skip(self, _request), name = "grpc_server")]
127 async fn info(&self, _request: Request<InfoRequest>) -> Result<Response<InfoResponse>, Status> {
128 tracing::info!("Request for node info.");
129 let pubkey = self.node.transport.public_key().to_string();
130 let transport = self.node.transport.name();
131 let oracle = self.node.oracle.name();
132 let response = InfoResponse {
133 pubkey,
134 transport,
135 oracle,
136 };
137 Ok(Response::new(response))
138 }
139
140 #[tracing::instrument(skip(self, request), name = "grpc_server")]
141 async fn send_offer(
142 &self,
143 request: Request<SendOfferRequest>,
144 ) -> Result<Response<SendOfferResponse>, Status> {
145 tracing::info!("Request to send offer.");
146 let SendOfferRequest {
147 contract_input,
148 counter_party,
149 } = request.into_inner();
150 let contract_input: ContractInput =
151 serde_json::from_slice(&contract_input).expect("couldn't get bytes correct");
152 let mut oracle_announcements = Vec::new();
153 for info in &contract_input.contract_infos {
154 let announcement = self
155 .node
156 .oracle
157 .get_announcement(&info.oracles.event_id)
158 .await
159 .unwrap();
160 oracle_announcements.push(announcement)
161 }
162
163 let counter_party = PublicKey::from_str(&counter_party).expect("no public key");
164 let offer_msg = self
165 .node
166 .send_dlc_offer(&contract_input, counter_party, oracle_announcements)
167 .await
168 .map_err(|e| {
169 Status::new(
170 Code::Cancelled,
171 format!("Contract offer could not be sent to counterparty. error={e}"),
172 )
173 })?;
174
175 let offer_dlc =
176 serde_json::to_vec(&offer_msg).expect("OfferDlc could not be converted to vec.");
177 Ok(Response::new(SendOfferResponse { offer_dlc }))
178 }
179
180 #[tracing::instrument(skip(self, request), name = "grpc_server")]
181 async fn accept_offer(
182 &self,
183 request: Request<AcceptOfferRequest>,
184 ) -> Result<Response<AcceptOfferResponse>, Status> {
185 tracing::info!("Request to accept offer.");
186 let mut contract_id = [0u8; 32];
187 let contract_id_bytes = hex::decode(&request.into_inner().contract_id).unwrap();
188 contract_id.copy_from_slice(&contract_id_bytes);
189 let (contract_id, counter_party, accept_dlc) =
190 self.node.accept_dlc_offer(contract_id).await.map_err(|e| {
191 Status::new(
192 Code::Cancelled,
193 format!("Contract could not be accepted. error={e:?}"),
194 )
195 })?;
196
197 let accept_dlc = serde_json::to_vec(&accept_dlc).map_err(|_| {
198 Status::new(Code::Cancelled, "Accept DLC is malformed to create bytes.")
199 })?;
200
201 Ok(Response::new(AcceptOfferResponse {
202 contract_id,
203 counter_party,
204 accept_dlc,
205 }))
206 }
207
208 #[tracing::instrument(skip(self, _request), name = "grpc_server")]
209 async fn new_address(
210 &self,
211 _request: Request<NewAddressRequest>,
212 ) -> Result<Response<NewAddressResponse>, Status> {
213 tracing::info!("Request for new wallet address");
214 let address = self
215 .node
216 .wallet
217 .new_external_address()
218 .await
219 .unwrap()
220 .to_string();
221 let response = NewAddressResponse { address };
222 Ok(Response::new(response))
223 }
224
225 #[tracing::instrument(skip(self, _request), name = "grpc_server")]
226 async fn list_offers(
227 &self,
228 _request: Request<ListOffersRequest>,
229 ) -> Result<Response<ListOffersResponse>, Status> {
230 tracing::info!("Request for offers to the node.");
231 let offers = self.node.storage.get_contract_offers().await.unwrap();
232 let offers: Vec<Vec<u8>> = offers
233 .iter()
234 .map(|offer| serde_json::to_vec(offer).unwrap())
235 .collect();
236
237 Ok(Response::new(ListOffersResponse { offers }))
238 }
239
240 #[tracing::instrument(skip(self, _request), name = "grpc_server")]
241 async fn wallet_balance(
242 &self,
243 _request: Request<WalletBalanceRequest>,
244 ) -> Result<Response<WalletBalanceResponse>, Status> {
245 tracing::info!("Request for wallet balance.");
246 let wallet_balance = self.node.balance().await.unwrap();
247
248 let response = WalletBalanceResponse {
249 confirmed: wallet_balance.confirmed.to_sat(),
250 foreign_unconfirmed: wallet_balance.foreign_unconfirmed.to_sat(),
251 change_unconfirmed: wallet_balance.change_unconfirmed.to_sat(),
252 contract_balance: wallet_balance.contract_pnl,
253 };
254 Ok(Response::new(response))
255 }
256
257 #[tracing::instrument(skip(self, _request), name = "grpc_server")]
258 async fn get_wallet_transactions(
259 &self,
260 _request: Request<GetWalletTransactionsRequest>,
261 ) -> Result<Response<GetWalletTransactionsResponse>, Status> {
262 tracing::info!("Request for all wallet transactions.");
263 let wallet_transactions = self.node.wallet.get_transactions().await.unwrap();
264 let transactions: Vec<Vec<u8>> = wallet_transactions
265 .iter()
266 .map(|t| serde_json::to_vec(&t).unwrap())
267 .collect();
268 Ok(Response::new(GetWalletTransactionsResponse {
269 transactions,
270 }))
271 }
272
273 #[tracing::instrument(skip(self, _request), name = "grpc_server")]
274 async fn list_utxos(
275 &self,
276 _request: Request<ListUtxosRequest>,
277 ) -> Result<Response<ListUtxosResponse>, Status> {
278 tracing::info!("Request to list all wallet utxos");
279 let utxos = self.node.wallet.list_utxos().await.unwrap();
280 let utxos: Vec<Vec<u8>> = utxos
281 .iter()
282 .map(|utxo| serde_json::to_vec(utxo).unwrap())
283 .collect();
284 Ok(Response::new(ListUtxosResponse { utxos }))
285 }
286
287 #[tracing::instrument(skip(self, _request), name = "grpc_server")]
288 async fn list_peers(
289 &self,
290 _request: Request<ListPeersRequest>,
291 ) -> Result<Response<ListPeersResponse>, Status> {
292 tracing::info!("List peers request");
293 let peers = vec![];
294
295 Ok(Response::new(ListPeersResponse { peers }))
296 }
297
298 #[tracing::instrument(skip(self, request), name = "grpc_server")]
299 async fn connect_peer(
300 &self,
301 request: Request<ConnectRequest>,
302 ) -> Result<Response<ConnectResponse>, Status> {
303 let ConnectRequest { pubkey, host } = request.into_inner();
304 let pubkey = PublicKey::from_str(&pubkey).unwrap();
305 self.node.transport.connect_outbound(pubkey, &host).await;
306 Ok(Response::new(ConnectResponse {}))
307 }
308
309 async fn list_oracles(
310 &self,
311 _request: Request<ListOraclesRequest>,
312 ) -> Result<Response<ListOraclesResponse>, Status> {
313 let pubkey = self.node.oracle.get_public_key().to_string();
314 let name = self.node.oracle.name();
315 Ok(Response::new(ListOraclesResponse { name, pubkey }))
316 }
317
318 async fn list_contracts(
319 &self,
320 _request: Request<ListContractsRequest>,
321 ) -> Result<Response<ListContractsResponse>, Status> {
322 let contracts = self
323 .node
324 .storage
325 .get_contracts()
326 .await
327 .map_err(|e| Status::new(Code::Cancelled, e.to_string()))?;
328 let contract_bytes: Vec<Vec<u8>> = contracts
329 .iter()
330 .map(|contract| serialize_contract(contract).unwrap())
331 .collect();
332 Ok(Response::new(ListContractsResponse {
333 contracts: contract_bytes,
334 }))
335 }
336
337 async fn send(&self, request: Request<SendRequest>) -> Result<Response<SendResponse>, Status> {
338 let SendRequest {
339 address,
340 amount,
341 fee_rate,
342 } = request.into_inner();
343 let address = Address::from_str(&address).unwrap().assume_checked();
344 let amount = Amount::from_sat(amount);
345 let fee_rate = match FeeRate::from_sat_per_vb(fee_rate) {
346 Some(f) => f,
347 None => return Err(Status::new(Code::InvalidArgument, "Invalid fee rate.")),
348 };
349 let txn = self
350 .node
351 .wallet
352 .send_to_address(address, amount, fee_rate)
353 .await;
354 if let Ok(tx) = txn {
355 Ok(Response::new(SendResponse {
356 txid: tx.to_string(),
357 }))
358 } else {
359 Err(Status::new(Code::Internal, "Transaction sending failed."))
360 }
361 }
362
363 async fn oracle_announcements(
364 &self,
365 request: Request<OracleAnnouncementsRequest>,
366 ) -> Result<Response<OracleAnnouncementsResponse>, Status> {
367 let OracleAnnouncementsRequest { event_id } = request.into_inner();
368 let oracle_announcement = self.node.oracle.get_announcement(&event_id).await.unwrap();
369 let announcement = serde_json::to_vec(&oracle_announcement).unwrap();
370 Ok(Response::new(OracleAnnouncementsResponse { announcement }))
371 }
372
373 async fn create_enum(
374 &self,
375 request: Request<CreateEnumRequest>,
376 ) -> Result<Response<CreateEnumResponse>, Status> {
377 let CreateEnumRequest { maturity, outcomes } = request.into_inner();
378 let announcement = self
379 .node
380 .oracle
381 .create_enum_event(outcomes, maturity)
382 .await
383 .unwrap();
384 let announcement = serde_json::to_vec(&announcement).unwrap();
385 Ok(Response::new(CreateEnumResponse { announcement }))
386 }
387
388 async fn wallet_sync(
389 &self,
390 _request: Request<WalletSyncRequest>,
391 ) -> Result<Response<WalletSyncResponse>, Status> {
392 self.node
393 .wallet
394 .sync()
395 .await
396 .map_err(|_| Status::new(Code::Aborted, "Did not sync wallet."))?;
397 Ok(Response::new(WalletSyncResponse {}))
398 }
399
400 async fn sync(&self, _request: Request<SyncRequest>) -> Result<Response<SyncResponse>, Status> {
401 if let Err(e) = self.node.manager.periodic_check(false).await {
402 tracing::error!("Error syncing: {:?}", e);
403 return Err(Status::new(Code::Internal, "Error syncing."));
404 };
405
406 if let Err(e) = self.node.wallet.sync().await {
407 tracing::error!("Error syncing wallet: {:?}", e);
408 return Err(Status::new(Code::Internal, "Error syncing wallet."));
409 };
410
411 Ok(Response::new(SyncResponse {}))
412 }
413
414 async fn create_numeric(
415 &self,
416 request: Request<CreateNumericRequest>,
417 ) -> Result<Response<CreateNumericResponse>, Status> {
418 let CreateNumericRequest {
419 maturity,
420 nb_digits,
421 } = request.into_inner();
422 let announcement = self
423 .node
424 .oracle
425 .create_numeric_event(
426 Some(nb_digits as u16), None, None, "unit".to_string(), maturity, )
432 .await
433 .map_err(|e| Status::internal(e.to_string()))?;
434
435 let announcement_bytes =
436 serde_json::to_vec(&announcement).map_err(|e| Status::internal(e.to_string()))?;
437
438 Ok(Response::new(CreateNumericResponse {
439 announcement: announcement_bytes,
440 }))
441 }
442
443 async fn sign_announcement(
444 &self,
445 request: Request<SignRequest>,
446 ) -> Result<Response<SignResponse>, Status> {
447 let SignRequest { event_id, outcome } = request.into_inner();
448 let attestation = match outcome {
449 Some(ddkrpc::sign_request::Outcome::EnumOutcome(outcome)) => {
450 self.node.oracle.sign_enum_event(event_id, outcome).await
451 }
452 Some(ddkrpc::sign_request::Outcome::NumericOutcome(outcome)) => {
453 self.node.oracle.sign_numeric_event(event_id, outcome).await
454 }
455 None => return Err(Status::invalid_argument("Outcome must be specified")),
456 }
457 .map_err(|e| Status::internal(e.to_string()))?;
458
459 let signature =
460 serde_json::to_vec(&attestation).map_err(|e| Status::internal(e.to_string()))?;
461
462 Ok(Response::new(SignResponse { signature }))
463 }
464}