1pub mod cli_opts;
2pub mod command;
3pub mod ddkrpc;
4pub mod opts;
5mod seed;
6
7use bitcoin::secp256k1::PublicKey;
8use bitcoin::{Address, Amount, FeeRate, Network};
9use ddk::builder::Builder;
10use ddk::oracle::kormir::KormirOracleClient;
11use ddk::storage::postgres::PostgresStore;
12use ddk::transport::nostr::NostrDlc;
13use ddk::util::ser::serialize_contract;
14use ddk::DlcDevKit;
15use ddk::{Oracle, Transport};
16use ddk_manager::contract::contract_input::ContractInput;
17use ddk_manager::Oracle as DlcOracle;
18use ddk_manager::Storage as DlcStorage;
19use ddkrpc::ddk_rpc_server::{DdkRpc, DdkRpcServer};
20use ddkrpc::{
21 AcceptOfferRequest, AcceptOfferResponse, ConnectRequest, ConnectResponse, CreateEnumRequest,
22 CreateEnumResponse, GetWalletTransactionsRequest, GetWalletTransactionsResponse,
23 ListContractsRequest, ListContractsResponse, ListOffersRequest, ListOffersResponse,
24 ListOraclesRequest, ListOraclesResponse, ListPeersRequest, ListPeersResponse, ListUtxosRequest,
25 ListUtxosResponse, NewAddressRequest, NewAddressResponse, OracleAnnouncementsRequest,
26 OracleAnnouncementsResponse, SendOfferRequest, SendOfferResponse, SendRequest, SendResponse,
27 SyncRequest, SyncResponse, WalletBalanceRequest, WalletBalanceResponse, WalletSyncRequest,
28 WalletSyncResponse,
29};
30use ddkrpc::{InfoRequest, InfoResponse};
31use opts::NodeOpts;
32use std::str::FromStr;
33use std::sync::Arc;
34use tonic::transport::Server;
35use tonic::Request;
36use tonic::Response;
37use tonic::Status;
38use tonic::{async_trait, Code};
39
40type Ddk = DlcDevKit<NostrDlc, PostgresStore, KormirOracleClient>;
41
42#[derive(Clone)]
43pub struct DdkNode {
44 pub node: Arc<Ddk>,
45}
46
47impl DdkNode {
48 pub fn new(ddk: Ddk) -> Self {
49 Self {
50 node: Arc::new(ddk),
51 }
52 }
53
54 pub async fn serve(opts: NodeOpts) -> anyhow::Result<()> {
55 let storage_path = match opts.storage_dir {
56 Some(storage) => storage,
57 None => homedir::my_home()
58 .expect("Provide a directory for ddk.")
59 .unwrap()
60 .join(".ddk")
61 .join("default-ddk"),
62 };
63 let network = Network::from_str(&opts.network)?;
64 std::fs::create_dir_all(storage_path.clone())?;
65
66 let seed_bytes = crate::seed::xprv_from_path(storage_path.clone(), network)?;
67
68 tracing::info!("Starting DDK node.");
69
70 let transport = Arc::new(
71 NostrDlc::new(
72 &seed_bytes.private_key.secret_bytes(),
73 "wss://nostr.dlcdevkit.com",
74 network,
75 )
76 .await?,
77 );
78
79 let storage = Arc::new(PostgresStore::new(&opts.postgres_url, true, opts.name).await?);
80
81 let oracle = Arc::new(KormirOracleClient::new(&opts.oracle_host, None).await?);
83
84 let mut builder = Builder::new();
85 builder.set_seed_bytes(seed_bytes.private_key.secret_bytes());
86 builder.set_esplora_host(opts.esplora_host);
87 builder.set_network(network);
88 builder.set_transport(transport.clone());
89 builder.set_storage(storage.clone());
90 builder.set_oracle(oracle.clone());
91
92 let ddk: Ddk = builder.finish().await?;
93
94 ddk.start()?;
95 let node = DdkNode::new(ddk);
96 let node_stop = node.node.clone();
97 let server = Server::builder()
98 .add_service(DdkRpcServer::new(node))
99 .serve_with_shutdown(opts.grpc_host.parse()?, async {
100 tokio::signal::ctrl_c()
101 .await
102 .expect("Failed to install Ctrl+C signal handler");
103 let _ = node_stop.stop();
104 });
105
106 server.await?;
107
108 Ok(())
109 }
110}
111
112#[async_trait]
113impl DdkRpc for DdkNode {
114 #[tracing::instrument(skip(self, _request), name = "grpc_server")]
115 async fn info(&self, _request: Request<InfoRequest>) -> Result<Response<InfoResponse>, Status> {
116 tracing::info!("Request for node info.");
117 let pubkey = self.node.transport.public_key().to_string();
118 let transport = self.node.transport.name();
119 let oracle = self.node.oracle.name();
120 let response = InfoResponse {
121 pubkey,
122 transport,
123 oracle,
124 };
125 Ok(Response::new(response))
126 }
127
128 #[tracing::instrument(skip(self, request), name = "grpc_server")]
129 async fn send_offer(
130 &self,
131 request: Request<SendOfferRequest>,
132 ) -> Result<Response<SendOfferResponse>, Status> {
133 tracing::info!("Request to send offer.");
134 let SendOfferRequest {
135 contract_input,
136 counter_party,
137 } = request.into_inner();
138 let contract_input: ContractInput =
139 serde_json::from_slice(&contract_input).expect("couldn't get bytes correct");
140 let mut oracle_announcements = Vec::new();
141 for info in &contract_input.contract_infos {
142 let announcement = self
143 .node
144 .oracle
145 .get_announcement(&info.oracles.event_id)
146 .await
147 .unwrap();
148 oracle_announcements.push(announcement)
149 }
150
151 let counter_party = PublicKey::from_str(&counter_party).expect("no public key");
152 let offer_msg = self
153 .node
154 .send_dlc_offer(&contract_input, counter_party, oracle_announcements)
155 .await
156 .map_err(|e| {
157 Status::new(
158 Code::Cancelled,
159 format!("Contract offer could not be sent to counterparty. error={e}"),
160 )
161 })?;
162
163 let offer_dlc =
164 serde_json::to_vec(&offer_msg).expect("OfferDlc could not be converted to vec.");
165 Ok(Response::new(SendOfferResponse { offer_dlc }))
166 }
167
168 #[tracing::instrument(skip(self, request), name = "grpc_server")]
169 async fn accept_offer(
170 &self,
171 request: Request<AcceptOfferRequest>,
172 ) -> Result<Response<AcceptOfferResponse>, Status> {
173 tracing::info!("Request to accept offer.");
174 let mut contract_id = [0u8; 32];
175 let contract_id_bytes = hex::decode(&request.into_inner().contract_id).unwrap();
176 contract_id.copy_from_slice(&contract_id_bytes);
177 let (contract_id, counter_party, accept_dlc) = self
178 .node
179 .accept_dlc_offer(contract_id)
180 .await
181 .map_err(|_| Status::new(Code::Cancelled, "Contract could not be accepted."))?;
182
183 let accept_dlc = serde_json::to_vec(&accept_dlc).map_err(|_| {
184 Status::new(Code::Cancelled, "Accept DLC is malformed to create bytes.")
185 })?;
186
187 Ok(Response::new(AcceptOfferResponse {
188 contract_id,
189 counter_party,
190 accept_dlc,
191 }))
192 }
193
194 #[tracing::instrument(skip(self, _request), name = "grpc_server")]
195 async fn new_address(
196 &self,
197 _request: Request<NewAddressRequest>,
198 ) -> Result<Response<NewAddressResponse>, Status> {
199 tracing::info!("Request for new wallet address");
200 let address = self
201 .node
202 .wallet
203 .new_external_address()
204 .await
205 .unwrap()
206 .to_string();
207 let response = NewAddressResponse { address };
208 Ok(Response::new(response))
209 }
210
211 #[tracing::instrument(skip(self, _request), name = "grpc_server")]
212 async fn list_offers(
213 &self,
214 _request: Request<ListOffersRequest>,
215 ) -> Result<Response<ListOffersResponse>, Status> {
216 tracing::info!("Request for offers to the node.");
217 let offers = self.node.storage.get_contract_offers().await.unwrap();
218 tracing::info!("Offers: {:?}", offers);
219 let offers: Vec<Vec<u8>> = offers
220 .iter()
221 .map(|offer| serde_json::to_vec(offer).unwrap())
222 .collect();
223
224 Ok(Response::new(ListOffersResponse { offers }))
225 }
226
227 #[tracing::instrument(skip(self, _request), name = "grpc_server")]
228 async fn wallet_balance(
229 &self,
230 _request: Request<WalletBalanceRequest>,
231 ) -> Result<Response<WalletBalanceResponse>, Status> {
232 tracing::info!("Request for wallet balance.");
233 let wallet_balance = self.node.balance().await.unwrap();
234
235 let response = WalletBalanceResponse {
236 confirmed: wallet_balance.confirmed.to_sat(),
237 foreign_unconfirmed: wallet_balance.foreign_unconfirmed.to_sat(),
238 change_unconfirmed: wallet_balance.change_unconfirmed.to_sat(),
239 contract_balance: wallet_balance.contract_pnl,
240 };
241 Ok(Response::new(response))
242 }
243
244 #[tracing::instrument(skip(self, _request), name = "grpc_server")]
245 async fn get_wallet_transactions(
246 &self,
247 _request: Request<GetWalletTransactionsRequest>,
248 ) -> Result<Response<GetWalletTransactionsResponse>, Status> {
249 tracing::info!("Request for all wallet transactions.");
250 let wallet_transactions = self.node.wallet.get_transactions().await.unwrap();
251 let transactions: Vec<Vec<u8>> = wallet_transactions
252 .iter()
253 .map(|t| serde_json::to_vec(&t).unwrap())
254 .collect();
255 Ok(Response::new(GetWalletTransactionsResponse {
256 transactions,
257 }))
258 }
259
260 #[tracing::instrument(skip(self, _request), name = "grpc_server")]
261 async fn list_utxos(
262 &self,
263 _request: Request<ListUtxosRequest>,
264 ) -> Result<Response<ListUtxosResponse>, Status> {
265 tracing::info!("Request to list all wallet utxos");
266 let utxos = self.node.wallet.list_utxos().await.unwrap();
267 let utxos: Vec<Vec<u8>> = utxos
268 .iter()
269 .map(|utxo| serde_json::to_vec(utxo).unwrap())
270 .collect();
271 Ok(Response::new(ListUtxosResponse { utxos }))
272 }
273
274 #[tracing::instrument(skip(self, _request), name = "grpc_server")]
275 async fn list_peers(
276 &self,
277 _request: Request<ListPeersRequest>,
278 ) -> Result<Response<ListPeersResponse>, Status> {
279 tracing::info!("List peers request");
280 let peers = vec![];
281
282 Ok(Response::new(ListPeersResponse { peers }))
283 }
284
285 #[tracing::instrument(skip(self, request), name = "grpc_server")]
286 async fn connect_peer(
287 &self,
288 request: Request<ConnectRequest>,
289 ) -> Result<Response<ConnectResponse>, Status> {
290 let ConnectRequest { pubkey, host } = request.into_inner();
291 let pubkey = PublicKey::from_str(&pubkey).unwrap();
292 self.node.transport.connect_outbound(pubkey, &host).await;
293 Ok(Response::new(ConnectResponse {}))
294 }
295
296 async fn list_oracles(
297 &self,
298 _request: Request<ListOraclesRequest>,
299 ) -> Result<Response<ListOraclesResponse>, Status> {
300 let pubkey = self.node.oracle.get_public_key().to_string();
301 let name = self.node.oracle.name();
302 Ok(Response::new(ListOraclesResponse { name, pubkey }))
303 }
304
305 async fn list_contracts(
306 &self,
307 _request: Request<ListContractsRequest>,
308 ) -> Result<Response<ListContractsResponse>, Status> {
309 let contracts = self
310 .node
311 .storage
312 .get_contracts()
313 .await
314 .map_err(|e| Status::new(Code::Cancelled, e.to_string()))?;
315 let contract_bytes: Vec<Vec<u8>> = contracts
316 .iter()
317 .map(|contract| serialize_contract(contract).unwrap())
318 .collect();
319 Ok(Response::new(ListContractsResponse {
320 contracts: contract_bytes,
321 }))
322 }
323
324 async fn send(&self, request: Request<SendRequest>) -> Result<Response<SendResponse>, Status> {
325 let SendRequest {
326 address,
327 amount,
328 fee_rate,
329 } = request.into_inner();
330 let address = Address::from_str(&address).unwrap().assume_checked();
331 let amount = Amount::from_sat(amount);
332 let fee_rate = match FeeRate::from_sat_per_vb(fee_rate) {
333 Some(f) => f,
334 None => return Err(Status::new(Code::InvalidArgument, "Invalid fee rate.")),
335 };
336 let txn = self
337 .node
338 .wallet
339 .send_to_address(address, amount, fee_rate)
340 .await;
341 if let Ok(tx) = txn {
342 Ok(Response::new(SendResponse {
343 txid: tx.to_string(),
344 }))
345 } else {
346 Err(Status::new(Code::Internal, "Transaction sending failed."))
347 }
348 }
349
350 async fn oracle_announcements(
351 &self,
352 request: Request<OracleAnnouncementsRequest>,
353 ) -> Result<Response<OracleAnnouncementsResponse>, Status> {
354 let OracleAnnouncementsRequest { event_id } = request.into_inner();
355 let oracle_announcement = self.node.oracle.get_announcement(&event_id).await.unwrap();
356 let announcement = serde_json::to_vec(&oracle_announcement).unwrap();
357 Ok(Response::new(OracleAnnouncementsResponse { announcement }))
358 }
359
360 async fn create_enum(
361 &self,
362 request: Request<CreateEnumRequest>,
363 ) -> Result<Response<CreateEnumResponse>, Status> {
364 let CreateEnumRequest { maturity, outcomes } = request.into_inner();
365 let announcement = self
366 .node
367 .oracle
368 .create_enum_event(outcomes, maturity)
369 .await
370 .unwrap();
371 let announcement = serde_json::to_vec(&announcement).unwrap();
372 Ok(Response::new(CreateEnumResponse { announcement }))
373 }
374
375 async fn wallet_sync(
376 &self,
377 _request: Request<WalletSyncRequest>,
378 ) -> Result<Response<WalletSyncResponse>, Status> {
379 self.node
380 .wallet
381 .sync()
382 .await
383 .map_err(|_| Status::new(Code::Aborted, "Did not sync wallet."))?;
384 Ok(Response::new(WalletSyncResponse {}))
385 }
386
387 async fn sync(&self, _request: Request<SyncRequest>) -> Result<Response<SyncResponse>, Status> {
388 if let Err(e) = self.node.manager.periodic_check(false).await {
389 tracing::error!("Error syncing: {:?}", e);
390 return Err(Status::new(Code::Internal, "Error syncing."));
391 };
392
393 if let Err(e) = self.node.wallet.sync().await {
394 tracing::error!("Error syncing wallet: {:?}", e);
395 return Err(Status::new(Code::Internal, "Error syncing wallet."));
396 };
397
398 Ok(Response::new(SyncResponse {}))
399 }
400}