1pub mod cli_opts;
2pub mod command;
3pub mod ddkrpc;
4pub mod opts;
5mod seed;
6
7use bitcoin::secp256k1::PublicKey;
8use bitcoin::{Address, Amount, FeeRate, Network};
9use ddk::builder::{Builder, SeedConfig};
10use ddk::logger::{LogLevel, Logger};
11use ddk::oracle::kormir::KormirOracleClient;
12use ddk::storage::postgres::PostgresStore;
13use ddk::transport::nostr::NostrDlc;
14use ddk::util::ser::serialize_contract;
15use ddk::DlcDevKit;
16use ddk::{Oracle, Transport};
17use ddk_manager::contract::contract_input::ContractInput;
18use ddk_manager::Oracle as DlcOracle;
19use ddk_manager::Storage as DlcStorage;
20use ddkrpc::ddk_rpc_server::{DdkRpc, DdkRpcServer};
21use ddkrpc::{
22 AcceptOfferRequest, AcceptOfferResponse, ConnectRequest, ConnectResponse, CreateEnumRequest,
23 CreateEnumResponse, GetWalletTransactionsRequest, GetWalletTransactionsResponse,
24 ListContractsRequest, ListContractsResponse, ListOffersRequest, ListOffersResponse,
25 ListOraclesRequest, ListOraclesResponse, ListPeersRequest, ListPeersResponse, ListUtxosRequest,
26 ListUtxosResponse, NewAddressRequest, NewAddressResponse, OracleAnnouncementsRequest,
27 OracleAnnouncementsResponse, SendOfferRequest, SendOfferResponse, SendRequest, SendResponse,
28 SyncRequest, SyncResponse, WalletBalanceRequest, WalletBalanceResponse, WalletSyncRequest,
29 WalletSyncResponse,
30};
31use ddkrpc::{InfoRequest, InfoResponse};
32use opts::NodeOpts;
33use std::str::FromStr;
34use std::sync::Arc;
35use tonic::transport::Server;
36use tonic::Request;
37use tonic::Response;
38use tonic::Status;
39use tonic::{async_trait, Code};
40
41type Ddk = DlcDevKit<NostrDlc, PostgresStore, KormirOracleClient>;
42
43#[derive(Clone)]
44pub struct DdkNode {
45 pub node: Arc<Ddk>,
46}
47
48impl DdkNode {
49 pub fn new(ddk: Ddk) -> Self {
50 Self {
51 node: Arc::new(ddk),
52 }
53 }
54
55 pub async fn serve(opts: NodeOpts) -> anyhow::Result<()> {
56 let logger = Arc::new(Logger::console(
57 "console_logger".to_string(),
58 LogLevel::Info,
59 ));
60 let storage_path = match opts.storage_dir {
61 Some(storage) => storage,
62 None => homedir::my_home()
63 .expect("Provide a directory for ddk.")
64 .unwrap()
65 .join(".ddk")
66 .join("default-ddk"),
67 };
68 let network = Network::from_str(&opts.network)?;
69 std::fs::create_dir_all(storage_path.clone())?;
70
71 let seed_bytes = crate::seed::xprv_from_path(storage_path.clone())?;
72
73 let transport = Arc::new(
74 NostrDlc::new(
75 &seed_bytes,
76 "wss://nostr.dlcdevkit.com",
77 network,
78 logger.clone(),
79 )
80 .await?,
81 );
82
83 let storage = Arc::new(
84 PostgresStore::new(&opts.postgres_url, true, logger.clone(), opts.name).await?,
85 );
86
87 let oracle =
89 Arc::new(KormirOracleClient::new(&opts.oracle_host, None, logger.clone()).await?);
90
91 let mut builder = Builder::new();
92 builder.set_seed_bytes(SeedConfig::Bytes(seed_bytes))?;
93 builder.set_esplora_host(opts.esplora_host);
94 builder.set_network(network);
95 builder.set_transport(transport.clone());
96 builder.set_storage(storage.clone());
97 builder.set_oracle(oracle.clone());
98 builder.set_logger(logger.clone());
99
100 let ddk: Ddk = builder.finish().await?;
101
102 ddk.start()?;
103 let node = DdkNode::new(ddk);
104 let node_stop = node.node.clone();
105 let server = Server::builder()
106 .add_service(DdkRpcServer::new(node))
107 .serve_with_shutdown(opts.grpc_host.parse()?, async {
108 tokio::signal::ctrl_c()
109 .await
110 .expect("Failed to install Ctrl+C signal handler");
111 let _ = node_stop.stop();
112 });
113
114 server.await?;
115
116 Ok(())
117 }
118}
119
120#[async_trait]
121impl DdkRpc for DdkNode {
122 #[tracing::instrument(skip(self, _request), name = "grpc_server")]
123 async fn info(&self, _request: Request<InfoRequest>) -> Result<Response<InfoResponse>, Status> {
124 tracing::info!("Request for node info.");
125 let pubkey = self.node.transport.public_key().to_string();
126 let transport = self.node.transport.name();
127 let oracle = self.node.oracle.name();
128 let response = InfoResponse {
129 pubkey,
130 transport,
131 oracle,
132 };
133 Ok(Response::new(response))
134 }
135
136 #[tracing::instrument(skip(self, request), name = "grpc_server")]
137 async fn send_offer(
138 &self,
139 request: Request<SendOfferRequest>,
140 ) -> Result<Response<SendOfferResponse>, Status> {
141 tracing::info!("Request to send offer.");
142 let SendOfferRequest {
143 contract_input,
144 counter_party,
145 } = request.into_inner();
146 let contract_input: ContractInput =
147 serde_json::from_slice(&contract_input).expect("couldn't get bytes correct");
148 let mut oracle_announcements = Vec::new();
149 for info in &contract_input.contract_infos {
150 let announcement = self
151 .node
152 .oracle
153 .get_announcement(&info.oracles.event_id)
154 .await
155 .unwrap();
156 oracle_announcements.push(announcement)
157 }
158
159 let counter_party = PublicKey::from_str(&counter_party).expect("no public key");
160 let offer_msg = self
161 .node
162 .send_dlc_offer(&contract_input, counter_party, oracle_announcements)
163 .await
164 .map_err(|e| {
165 Status::new(
166 Code::Cancelled,
167 format!("Contract offer could not be sent to counterparty. error={e}"),
168 )
169 })?;
170
171 let offer_dlc =
172 serde_json::to_vec(&offer_msg).expect("OfferDlc could not be converted to vec.");
173 Ok(Response::new(SendOfferResponse { offer_dlc }))
174 }
175
176 #[tracing::instrument(skip(self, request), name = "grpc_server")]
177 async fn accept_offer(
178 &self,
179 request: Request<AcceptOfferRequest>,
180 ) -> Result<Response<AcceptOfferResponse>, Status> {
181 tracing::info!("Request to accept offer.");
182 let mut contract_id = [0u8; 32];
183 let contract_id_bytes = hex::decode(&request.into_inner().contract_id).unwrap();
184 contract_id.copy_from_slice(&contract_id_bytes);
185 let (contract_id, counter_party, accept_dlc) =
186 self.node.accept_dlc_offer(contract_id).await.map_err(|e| {
187 Status::new(
188 Code::Cancelled,
189 format!("Contract could not be accepted. error={e:?}"),
190 )
191 })?;
192
193 let accept_dlc = serde_json::to_vec(&accept_dlc).map_err(|_| {
194 Status::new(Code::Cancelled, "Accept DLC is malformed to create bytes.")
195 })?;
196
197 Ok(Response::new(AcceptOfferResponse {
198 contract_id,
199 counter_party,
200 accept_dlc,
201 }))
202 }
203
204 #[tracing::instrument(skip(self, _request), name = "grpc_server")]
205 async fn new_address(
206 &self,
207 _request: Request<NewAddressRequest>,
208 ) -> Result<Response<NewAddressResponse>, Status> {
209 tracing::info!("Request for new wallet address");
210 let address = self
211 .node
212 .wallet
213 .new_external_address()
214 .await
215 .unwrap()
216 .to_string();
217 let response = NewAddressResponse { address };
218 Ok(Response::new(response))
219 }
220
221 #[tracing::instrument(skip(self, _request), name = "grpc_server")]
222 async fn list_offers(
223 &self,
224 _request: Request<ListOffersRequest>,
225 ) -> Result<Response<ListOffersResponse>, Status> {
226 tracing::info!("Request for offers to the node.");
227 let offers = self.node.storage.get_contract_offers().await.unwrap();
228 let offers: Vec<Vec<u8>> = offers
229 .iter()
230 .map(|offer| serde_json::to_vec(offer).unwrap())
231 .collect();
232
233 Ok(Response::new(ListOffersResponse { offers }))
234 }
235
236 #[tracing::instrument(skip(self, _request), name = "grpc_server")]
237 async fn wallet_balance(
238 &self,
239 _request: Request<WalletBalanceRequest>,
240 ) -> Result<Response<WalletBalanceResponse>, Status> {
241 tracing::info!("Request for wallet balance.");
242 let wallet_balance = self.node.balance().await.unwrap();
243
244 let response = WalletBalanceResponse {
245 confirmed: wallet_balance.confirmed.to_sat(),
246 foreign_unconfirmed: wallet_balance.foreign_unconfirmed.to_sat(),
247 change_unconfirmed: wallet_balance.change_unconfirmed.to_sat(),
248 contract_balance: wallet_balance.contract_pnl,
249 };
250 Ok(Response::new(response))
251 }
252
253 #[tracing::instrument(skip(self, _request), name = "grpc_server")]
254 async fn get_wallet_transactions(
255 &self,
256 _request: Request<GetWalletTransactionsRequest>,
257 ) -> Result<Response<GetWalletTransactionsResponse>, Status> {
258 tracing::info!("Request for all wallet transactions.");
259 let wallet_transactions = self.node.wallet.get_transactions().await.unwrap();
260 let transactions: Vec<Vec<u8>> = wallet_transactions
261 .iter()
262 .map(|t| serde_json::to_vec(&t).unwrap())
263 .collect();
264 Ok(Response::new(GetWalletTransactionsResponse {
265 transactions,
266 }))
267 }
268
269 #[tracing::instrument(skip(self, _request), name = "grpc_server")]
270 async fn list_utxos(
271 &self,
272 _request: Request<ListUtxosRequest>,
273 ) -> Result<Response<ListUtxosResponse>, Status> {
274 tracing::info!("Request to list all wallet utxos");
275 let utxos = self.node.wallet.list_utxos().await.unwrap();
276 let utxos: Vec<Vec<u8>> = utxos
277 .iter()
278 .map(|utxo| serde_json::to_vec(utxo).unwrap())
279 .collect();
280 Ok(Response::new(ListUtxosResponse { utxos }))
281 }
282
283 #[tracing::instrument(skip(self, _request), name = "grpc_server")]
284 async fn list_peers(
285 &self,
286 _request: Request<ListPeersRequest>,
287 ) -> Result<Response<ListPeersResponse>, Status> {
288 tracing::info!("List peers request");
289 let peers = vec![];
290
291 Ok(Response::new(ListPeersResponse { peers }))
292 }
293
294 #[tracing::instrument(skip(self, request), name = "grpc_server")]
295 async fn connect_peer(
296 &self,
297 request: Request<ConnectRequest>,
298 ) -> Result<Response<ConnectResponse>, Status> {
299 let ConnectRequest { pubkey, host } = request.into_inner();
300 let pubkey = PublicKey::from_str(&pubkey).unwrap();
301 self.node.transport.connect_outbound(pubkey, &host).await;
302 Ok(Response::new(ConnectResponse {}))
303 }
304
305 async fn list_oracles(
306 &self,
307 _request: Request<ListOraclesRequest>,
308 ) -> Result<Response<ListOraclesResponse>, Status> {
309 let pubkey = self.node.oracle.get_public_key().to_string();
310 let name = self.node.oracle.name();
311 Ok(Response::new(ListOraclesResponse { name, pubkey }))
312 }
313
314 async fn list_contracts(
315 &self,
316 _request: Request<ListContractsRequest>,
317 ) -> Result<Response<ListContractsResponse>, Status> {
318 let contracts = self
319 .node
320 .storage
321 .get_contracts()
322 .await
323 .map_err(|e| Status::new(Code::Cancelled, e.to_string()))?;
324 let contract_bytes: Vec<Vec<u8>> = contracts
325 .iter()
326 .map(|contract| serialize_contract(contract).unwrap())
327 .collect();
328 Ok(Response::new(ListContractsResponse {
329 contracts: contract_bytes,
330 }))
331 }
332
333 async fn send(&self, request: Request<SendRequest>) -> Result<Response<SendResponse>, Status> {
334 let SendRequest {
335 address,
336 amount,
337 fee_rate,
338 } = request.into_inner();
339 let address = Address::from_str(&address).unwrap().assume_checked();
340 let amount = Amount::from_sat(amount);
341 let fee_rate = match FeeRate::from_sat_per_vb(fee_rate) {
342 Some(f) => f,
343 None => return Err(Status::new(Code::InvalidArgument, "Invalid fee rate.")),
344 };
345 let txn = self
346 .node
347 .wallet
348 .send_to_address(address, amount, fee_rate)
349 .await;
350 if let Ok(tx) = txn {
351 Ok(Response::new(SendResponse {
352 txid: tx.to_string(),
353 }))
354 } else {
355 Err(Status::new(Code::Internal, "Transaction sending failed."))
356 }
357 }
358
359 async fn oracle_announcements(
360 &self,
361 request: Request<OracleAnnouncementsRequest>,
362 ) -> Result<Response<OracleAnnouncementsResponse>, Status> {
363 let OracleAnnouncementsRequest { event_id } = request.into_inner();
364 let oracle_announcement = self.node.oracle.get_announcement(&event_id).await.unwrap();
365 let announcement = serde_json::to_vec(&oracle_announcement).unwrap();
366 Ok(Response::new(OracleAnnouncementsResponse { announcement }))
367 }
368
369 async fn create_enum(
370 &self,
371 request: Request<CreateEnumRequest>,
372 ) -> Result<Response<CreateEnumResponse>, Status> {
373 let CreateEnumRequest { maturity, outcomes } = request.into_inner();
374 let announcement = self
375 .node
376 .oracle
377 .create_enum_event(outcomes, maturity)
378 .await
379 .unwrap();
380 let announcement = serde_json::to_vec(&announcement).unwrap();
381 Ok(Response::new(CreateEnumResponse { announcement }))
382 }
383
384 async fn wallet_sync(
385 &self,
386 _request: Request<WalletSyncRequest>,
387 ) -> Result<Response<WalletSyncResponse>, Status> {
388 self.node
389 .wallet
390 .sync()
391 .await
392 .map_err(|_| Status::new(Code::Aborted, "Did not sync wallet."))?;
393 Ok(Response::new(WalletSyncResponse {}))
394 }
395
396 async fn sync(&self, _request: Request<SyncRequest>) -> Result<Response<SyncResponse>, Status> {
397 if let Err(e) = self.node.manager.periodic_check(false).await {
398 tracing::error!("Error syncing: {:?}", e);
399 return Err(Status::new(Code::Internal, "Error syncing."));
400 };
401
402 if let Err(e) = self.node.wallet.sync().await {
403 tracing::error!("Error syncing wallet: {:?}", e);
404 return Err(Status::new(Code::Internal, "Error syncing wallet."));
405 };
406
407 Ok(Response::new(SyncResponse {}))
408 }
409}