1pub mod cli_opts;
2pub mod command;
3pub mod ddkrpc;
4pub mod opts;
5mod seed;
6
7use bitcoin::secp256k1::PublicKey;
8use bitcoin::{Address, Amount, FeeRate, Network};
9use ddk::builder::{Builder, SeedConfig};
10use ddk::oracle::kormir::KormirOracleClient;
11use ddk::storage::postgres::PostgresStore;
12use ddk::transport::nostr::NostrDlc;
13use ddk::util::ser::serialize_contract;
14use ddk::DlcDevKit;
15use ddk::{Oracle, Transport};
16use ddk_manager::contract::contract_input::ContractInput;
17use ddk_manager::Oracle as DlcOracle;
18use ddk_manager::Storage as DlcStorage;
19use ddkrpc::ddk_rpc_server::{DdkRpc, DdkRpcServer};
20use ddkrpc::{
21 AcceptOfferRequest, AcceptOfferResponse, ConnectRequest, ConnectResponse, CreateEnumRequest,
22 CreateEnumResponse, GetWalletTransactionsRequest, GetWalletTransactionsResponse,
23 ListContractsRequest, ListContractsResponse, ListOffersRequest, ListOffersResponse,
24 ListOraclesRequest, ListOraclesResponse, ListPeersRequest, ListPeersResponse, ListUtxosRequest,
25 ListUtxosResponse, NewAddressRequest, NewAddressResponse, OracleAnnouncementsRequest,
26 OracleAnnouncementsResponse, SendOfferRequest, SendOfferResponse, SendRequest, SendResponse,
27 SyncRequest, SyncResponse, WalletBalanceRequest, WalletBalanceResponse, WalletSyncRequest,
28 WalletSyncResponse,
29};
30use ddkrpc::{InfoRequest, InfoResponse};
31use opts::NodeOpts;
32use std::str::FromStr;
33use std::sync::Arc;
34use tonic::transport::Server;
35use tonic::Request;
36use tonic::Response;
37use tonic::Status;
38use tonic::{async_trait, Code};
39
40type Ddk = DlcDevKit<NostrDlc, PostgresStore, KormirOracleClient>;
41
42#[derive(Clone)]
43pub struct DdkNode {
44 pub node: Arc<Ddk>,
45}
46
47impl DdkNode {
48 pub fn new(ddk: Ddk) -> Self {
49 Self {
50 node: Arc::new(ddk),
51 }
52 }
53
54 pub async fn serve(opts: NodeOpts) -> anyhow::Result<()> {
55 let storage_path = match opts.storage_dir {
56 Some(storage) => storage,
57 None => homedir::my_home()
58 .expect("Provide a directory for ddk.")
59 .unwrap()
60 .join(".ddk")
61 .join("default-ddk"),
62 };
63 let network = Network::from_str(&opts.network)?;
64 std::fs::create_dir_all(storage_path.clone())?;
65
66 let seed_bytes = crate::seed::xprv_from_path(storage_path.clone())?;
67
68 tracing::info!("Starting DDK node.");
69 let transport =
70 Arc::new(NostrDlc::new(&seed_bytes, "wss://nostr.dlcdevkit.com", network).await?);
71
72 let storage = Arc::new(PostgresStore::new(&opts.postgres_url, true, opts.name).await?);
73
74 let oracle = Arc::new(KormirOracleClient::new(&opts.oracle_host, None).await?);
76
77 let mut builder = Builder::new();
78 builder.set_seed_bytes(SeedConfig::Bytes(seed_bytes))?;
79 builder.set_esplora_host(opts.esplora_host);
80 builder.set_network(network);
81 builder.set_transport(transport.clone());
82 builder.set_storage(storage.clone());
83 builder.set_oracle(oracle.clone());
84
85 let ddk: Ddk = builder.finish().await?;
86
87 ddk.start()?;
88 let node = DdkNode::new(ddk);
89 let node_stop = node.node.clone();
90 let server = Server::builder()
91 .add_service(DdkRpcServer::new(node))
92 .serve_with_shutdown(opts.grpc_host.parse()?, async {
93 tokio::signal::ctrl_c()
94 .await
95 .expect("Failed to install Ctrl+C signal handler");
96 let _ = node_stop.stop();
97 });
98
99 server.await?;
100
101 Ok(())
102 }
103}
104
105#[async_trait]
106impl DdkRpc for DdkNode {
107 #[tracing::instrument(skip(self, _request), name = "grpc_server")]
108 async fn info(&self, _request: Request<InfoRequest>) -> Result<Response<InfoResponse>, Status> {
109 tracing::info!("Request for node info.");
110 let pubkey = self.node.transport.public_key().to_string();
111 let transport = self.node.transport.name();
112 let oracle = self.node.oracle.name();
113 let response = InfoResponse {
114 pubkey,
115 transport,
116 oracle,
117 };
118 Ok(Response::new(response))
119 }
120
121 #[tracing::instrument(skip(self, request), name = "grpc_server")]
122 async fn send_offer(
123 &self,
124 request: Request<SendOfferRequest>,
125 ) -> Result<Response<SendOfferResponse>, Status> {
126 tracing::info!("Request to send offer.");
127 let SendOfferRequest {
128 contract_input,
129 counter_party,
130 } = request.into_inner();
131 let contract_input: ContractInput =
132 serde_json::from_slice(&contract_input).expect("couldn't get bytes correct");
133 let mut oracle_announcements = Vec::new();
134 for info in &contract_input.contract_infos {
135 let announcement = self
136 .node
137 .oracle
138 .get_announcement(&info.oracles.event_id)
139 .await
140 .unwrap();
141 oracle_announcements.push(announcement)
142 }
143
144 let counter_party = PublicKey::from_str(&counter_party).expect("no public key");
145 let offer_msg = self
146 .node
147 .send_dlc_offer(&contract_input, counter_party, oracle_announcements)
148 .await
149 .map_err(|e| {
150 Status::new(
151 Code::Cancelled,
152 format!("Contract offer could not be sent to counterparty. error={e}"),
153 )
154 })?;
155
156 let offer_dlc =
157 serde_json::to_vec(&offer_msg).expect("OfferDlc could not be converted to vec.");
158 Ok(Response::new(SendOfferResponse { offer_dlc }))
159 }
160
161 #[tracing::instrument(skip(self, request), name = "grpc_server")]
162 async fn accept_offer(
163 &self,
164 request: Request<AcceptOfferRequest>,
165 ) -> Result<Response<AcceptOfferResponse>, Status> {
166 tracing::info!("Request to accept offer.");
167 let mut contract_id = [0u8; 32];
168 let contract_id_bytes = hex::decode(&request.into_inner().contract_id).unwrap();
169 contract_id.copy_from_slice(&contract_id_bytes);
170 let (contract_id, counter_party, accept_dlc) =
171 self.node.accept_dlc_offer(contract_id).await.map_err(|e| {
172 Status::new(
173 Code::Cancelled,
174 format!("Contract could not be accepted. error={e:?}"),
175 )
176 })?;
177
178 let accept_dlc = serde_json::to_vec(&accept_dlc).map_err(|_| {
179 Status::new(Code::Cancelled, "Accept DLC is malformed to create bytes.")
180 })?;
181
182 Ok(Response::new(AcceptOfferResponse {
183 contract_id,
184 counter_party,
185 accept_dlc,
186 }))
187 }
188
189 #[tracing::instrument(skip(self, _request), name = "grpc_server")]
190 async fn new_address(
191 &self,
192 _request: Request<NewAddressRequest>,
193 ) -> Result<Response<NewAddressResponse>, Status> {
194 tracing::info!("Request for new wallet address");
195 let address = self
196 .node
197 .wallet
198 .new_external_address()
199 .await
200 .unwrap()
201 .to_string();
202 let response = NewAddressResponse { address };
203 Ok(Response::new(response))
204 }
205
206 #[tracing::instrument(skip(self, _request), name = "grpc_server")]
207 async fn list_offers(
208 &self,
209 _request: Request<ListOffersRequest>,
210 ) -> Result<Response<ListOffersResponse>, Status> {
211 tracing::info!("Request for offers to the node.");
212 let offers = self.node.storage.get_contract_offers().await.unwrap();
213 let offers: Vec<Vec<u8>> = offers
214 .iter()
215 .map(|offer| serde_json::to_vec(offer).unwrap())
216 .collect();
217
218 Ok(Response::new(ListOffersResponse { offers }))
219 }
220
221 #[tracing::instrument(skip(self, _request), name = "grpc_server")]
222 async fn wallet_balance(
223 &self,
224 _request: Request<WalletBalanceRequest>,
225 ) -> Result<Response<WalletBalanceResponse>, Status> {
226 tracing::info!("Request for wallet balance.");
227 let wallet_balance = self.node.balance().await.unwrap();
228
229 let response = WalletBalanceResponse {
230 confirmed: wallet_balance.confirmed.to_sat(),
231 foreign_unconfirmed: wallet_balance.foreign_unconfirmed.to_sat(),
232 change_unconfirmed: wallet_balance.change_unconfirmed.to_sat(),
233 contract_balance: wallet_balance.contract_pnl,
234 };
235 Ok(Response::new(response))
236 }
237
238 #[tracing::instrument(skip(self, _request), name = "grpc_server")]
239 async fn get_wallet_transactions(
240 &self,
241 _request: Request<GetWalletTransactionsRequest>,
242 ) -> Result<Response<GetWalletTransactionsResponse>, Status> {
243 tracing::info!("Request for all wallet transactions.");
244 let wallet_transactions = self.node.wallet.get_transactions().await.unwrap();
245 let transactions: Vec<Vec<u8>> = wallet_transactions
246 .iter()
247 .map(|t| serde_json::to_vec(&t).unwrap())
248 .collect();
249 Ok(Response::new(GetWalletTransactionsResponse {
250 transactions,
251 }))
252 }
253
254 #[tracing::instrument(skip(self, _request), name = "grpc_server")]
255 async fn list_utxos(
256 &self,
257 _request: Request<ListUtxosRequest>,
258 ) -> Result<Response<ListUtxosResponse>, Status> {
259 tracing::info!("Request to list all wallet utxos");
260 let utxos = self.node.wallet.list_utxos().await.unwrap();
261 let utxos: Vec<Vec<u8>> = utxos
262 .iter()
263 .map(|utxo| serde_json::to_vec(utxo).unwrap())
264 .collect();
265 Ok(Response::new(ListUtxosResponse { utxos }))
266 }
267
268 #[tracing::instrument(skip(self, _request), name = "grpc_server")]
269 async fn list_peers(
270 &self,
271 _request: Request<ListPeersRequest>,
272 ) -> Result<Response<ListPeersResponse>, Status> {
273 tracing::info!("List peers request");
274 let peers = vec![];
275
276 Ok(Response::new(ListPeersResponse { peers }))
277 }
278
279 #[tracing::instrument(skip(self, request), name = "grpc_server")]
280 async fn connect_peer(
281 &self,
282 request: Request<ConnectRequest>,
283 ) -> Result<Response<ConnectResponse>, Status> {
284 let ConnectRequest { pubkey, host } = request.into_inner();
285 let pubkey = PublicKey::from_str(&pubkey).unwrap();
286 self.node.transport.connect_outbound(pubkey, &host).await;
287 Ok(Response::new(ConnectResponse {}))
288 }
289
290 async fn list_oracles(
291 &self,
292 _request: Request<ListOraclesRequest>,
293 ) -> Result<Response<ListOraclesResponse>, Status> {
294 let pubkey = self.node.oracle.get_public_key().to_string();
295 let name = self.node.oracle.name();
296 Ok(Response::new(ListOraclesResponse { name, pubkey }))
297 }
298
299 async fn list_contracts(
300 &self,
301 _request: Request<ListContractsRequest>,
302 ) -> Result<Response<ListContractsResponse>, Status> {
303 let contracts = self
304 .node
305 .storage
306 .get_contracts()
307 .await
308 .map_err(|e| Status::new(Code::Cancelled, e.to_string()))?;
309 let contract_bytes: Vec<Vec<u8>> = contracts
310 .iter()
311 .map(|contract| serialize_contract(contract).unwrap())
312 .collect();
313 Ok(Response::new(ListContractsResponse {
314 contracts: contract_bytes,
315 }))
316 }
317
318 async fn send(&self, request: Request<SendRequest>) -> Result<Response<SendResponse>, Status> {
319 let SendRequest {
320 address,
321 amount,
322 fee_rate,
323 } = request.into_inner();
324 let address = Address::from_str(&address).unwrap().assume_checked();
325 let amount = Amount::from_sat(amount);
326 let fee_rate = match FeeRate::from_sat_per_vb(fee_rate) {
327 Some(f) => f,
328 None => return Err(Status::new(Code::InvalidArgument, "Invalid fee rate.")),
329 };
330 let txn = self
331 .node
332 .wallet
333 .send_to_address(address, amount, fee_rate)
334 .await;
335 if let Ok(tx) = txn {
336 Ok(Response::new(SendResponse {
337 txid: tx.to_string(),
338 }))
339 } else {
340 Err(Status::new(Code::Internal, "Transaction sending failed."))
341 }
342 }
343
344 async fn oracle_announcements(
345 &self,
346 request: Request<OracleAnnouncementsRequest>,
347 ) -> Result<Response<OracleAnnouncementsResponse>, Status> {
348 let OracleAnnouncementsRequest { event_id } = request.into_inner();
349 let oracle_announcement = self.node.oracle.get_announcement(&event_id).await.unwrap();
350 let announcement = serde_json::to_vec(&oracle_announcement).unwrap();
351 Ok(Response::new(OracleAnnouncementsResponse { announcement }))
352 }
353
354 async fn create_enum(
355 &self,
356 request: Request<CreateEnumRequest>,
357 ) -> Result<Response<CreateEnumResponse>, Status> {
358 let CreateEnumRequest { maturity, outcomes } = request.into_inner();
359 let announcement = self
360 .node
361 .oracle
362 .create_enum_event(outcomes, maturity)
363 .await
364 .unwrap();
365 let announcement = serde_json::to_vec(&announcement).unwrap();
366 Ok(Response::new(CreateEnumResponse { announcement }))
367 }
368
369 async fn wallet_sync(
370 &self,
371 _request: Request<WalletSyncRequest>,
372 ) -> Result<Response<WalletSyncResponse>, Status> {
373 self.node
374 .wallet
375 .sync()
376 .await
377 .map_err(|_| Status::new(Code::Aborted, "Did not sync wallet."))?;
378 Ok(Response::new(WalletSyncResponse {}))
379 }
380
381 async fn sync(&self, _request: Request<SyncRequest>) -> Result<Response<SyncResponse>, Status> {
382 if let Err(e) = self.node.manager.periodic_check(false).await {
383 tracing::error!("Error syncing: {:?}", e);
384 return Err(Status::new(Code::Internal, "Error syncing."));
385 };
386
387 if let Err(e) = self.node.wallet.sync().await {
388 tracing::error!("Error syncing wallet: {:?}", e);
389 return Err(Status::new(Code::Internal, "Error syncing wallet."));
390 };
391
392 Ok(Response::new(SyncResponse {}))
393 }
394}