1pub mod cli_opts;
2pub mod command;
3pub mod ddkrpc;
4pub mod opts;
5mod seed;
6
7use bitcoin::secp256k1::PublicKey;
8use bitcoin::{Address, Amount, FeeRate, Network};
9use ddk::builder::Builder;
10use ddk::oracle::kormir::KormirOracleClient;
11use ddk::storage::sled::SledStorage;
12use ddk::transport::lightning::LightningTransport;
13use ddk::util::serialize_contract;
14use ddk::DlcDevKit;
15use ddk::{Oracle, Storage, Transport};
16use ddk_manager::contract::contract_input::ContractInput;
17use ddk_manager::Oracle as DlcOracle;
18use ddk_manager::Storage as DlcStorage;
19use ddkrpc::ddk_rpc_server::{DdkRpc, DdkRpcServer};
20use ddkrpc::{
21 AcceptOfferRequest, AcceptOfferResponse, ConnectRequest, ConnectResponse,
22 GetWalletTransactionsRequest, GetWalletTransactionsResponse, ListContractsRequest,
23 ListContractsResponse, ListOffersRequest, ListOffersResponse, ListOraclesRequest,
24 ListOraclesResponse, ListPeersRequest, ListPeersResponse, ListUtxosRequest, ListUtxosResponse,
25 NewAddressRequest, NewAddressResponse, OracleAnnouncementsRequest, OracleAnnouncementsResponse,
26 Peer, SendOfferRequest, SendOfferResponse, SendRequest, SendResponse, WalletBalanceRequest,
27 WalletBalanceResponse, WalletSyncRequest, WalletSyncResponse,
28};
29use ddkrpc::{InfoRequest, InfoResponse};
30use opts::NodeOpts;
31use std::str::FromStr;
32use std::sync::Arc;
33use tonic::transport::Server;
34use tonic::Request;
35use tonic::Response;
36use tonic::Status;
37use tonic::{async_trait, Code};
38
39type Ddk = DlcDevKit<LightningTransport, SledStorage, KormirOracleClient>;
40
41#[derive(Clone)]
42pub struct DdkNode {
43 pub node: Arc<Ddk>,
44}
45
46impl DdkNode {
47 pub fn new(ddk: Ddk) -> Self {
48 Self {
49 node: Arc::new(ddk),
50 }
51 }
52
53 pub async fn serve(opts: NodeOpts) -> anyhow::Result<()> {
54 let storage_path = match opts.storage_dir {
55 Some(storage) => storage,
56 None => homedir::my_home()
57 .expect("Provide a directory for ddk.")
58 .unwrap()
59 .join(".ddk")
60 .join("default-ddk"),
61 };
62 let network = Network::from_str(&opts.network)?;
63 std::fs::create_dir_all(storage_path.clone())?;
64
65 let seed_bytes = crate::seed::xprv_from_path(storage_path.clone(), network)?;
66
67 tracing::info!("Starting DDK node.");
68
69 let transport = Arc::new(LightningTransport::new(
70 &seed_bytes.private_key.secret_bytes(),
71 opts.listening_port,
72 )?);
73
74 let storage = Arc::new(SledStorage::new(
75 storage_path.join("sled_db").to_str().unwrap(),
76 )?);
77
78 let oracle = Arc::new(KormirOracleClient::new(&opts.oracle_host, None).await?);
80
81 let mut builder = Builder::new();
82 builder.set_seed_bytes(seed_bytes.private_key.secret_bytes());
83 builder.set_esplora_host(opts.esplora_host);
84 builder.set_network(network);
85 builder.set_transport(transport.clone());
86 builder.set_storage(storage.clone());
87 builder.set_oracle(oracle.clone());
88
89 let ddk: Ddk = builder.finish().await?;
90
91 ddk.start()?;
92 let node = DdkNode::new(ddk);
93 let node_stop = node.node.clone();
94 let server = Server::builder()
95 .add_service(DdkRpcServer::new(node))
96 .serve_with_shutdown(opts.grpc_host.parse()?, async {
97 tokio::signal::ctrl_c()
98 .await
99 .expect("Failed to install Ctrl+C signal handler");
100 let _ = node_stop.stop();
101 });
102
103 server.await?;
104
105 Ok(())
106 }
107}
108
109#[async_trait]
110impl DdkRpc for DdkNode {
111 #[tracing::instrument(skip(self, _request), name = "grpc_server")]
112 async fn info(&self, _request: Request<InfoRequest>) -> Result<Response<InfoResponse>, Status> {
113 tracing::info!("Request for node info.");
114 let pubkey = self.node.transport.node_id.to_string();
115 let transport = self.node.transport.name();
116 let oracle = self.node.oracle.name();
117 let response = InfoResponse {
118 pubkey,
119 transport,
120 oracle,
121 };
122 Ok(Response::new(response))
123 }
124
125 #[tracing::instrument(skip(self, request), name = "grpc_server")]
126 async fn send_offer(
127 &self,
128 request: Request<SendOfferRequest>,
129 ) -> Result<Response<SendOfferResponse>, Status> {
130 tracing::info!("Request to send offer.");
131 let SendOfferRequest {
132 contract_input,
133 counter_party,
134 } = request.into_inner();
135 let contract_input: ContractInput =
136 serde_json::from_slice(&contract_input).expect("couldn't get bytes correct");
137 let mut oracle_announcements = Vec::new();
138 for info in &contract_input.contract_infos {
139 let announcement = self
140 .node
141 .oracle
142 .get_announcement(&info.oracles.event_id)
143 .await
144 .unwrap();
145 oracle_announcements.push(announcement)
146 }
147
148 let counter_party = PublicKey::from_str(&counter_party).expect("no public key");
149 let offer_msg = self
150 .node
151 .send_dlc_offer(&contract_input, counter_party, oracle_announcements)
152 .await
153 .map_err(|e| {
154 Status::new(
155 Code::Cancelled,
156 format!(
157 "Contract offer could not be sent to counterparty. error={:?}",
158 e
159 ),
160 )
161 })?;
162
163 let offer_dlc =
164 serde_json::to_vec(&offer_msg).expect("OfferDlc could not be converted to vec.");
165 Ok(Response::new(SendOfferResponse { offer_dlc }))
166 }
167
168 #[tracing::instrument(skip(self, request), name = "grpc_server")]
169 async fn accept_offer(
170 &self,
171 request: Request<AcceptOfferRequest>,
172 ) -> Result<Response<AcceptOfferResponse>, Status> {
173 tracing::info!("Request to accept offer.");
174 let mut contract_id = [0u8; 32];
175 let contract_id_bytes = hex::decode(&request.into_inner().contract_id).unwrap();
176 contract_id.copy_from_slice(&contract_id_bytes);
177 let (contract_id, counter_party, accept_dlc) = self
178 .node
179 .accept_dlc_offer(contract_id)
180 .await
181 .map_err(|_| Status::new(Code::Cancelled, "Contract could not be accepted."))?;
182
183 let accept_dlc = serde_json::to_vec(&accept_dlc).map_err(|_| {
184 Status::new(Code::Cancelled, "Accept DLC is malformed to create bytes.")
185 })?;
186
187 Ok(Response::new(AcceptOfferResponse {
188 contract_id,
189 counter_party,
190 accept_dlc,
191 }))
192 }
193
194 #[tracing::instrument(skip(self, _request), name = "grpc_server")]
195 async fn new_address(
196 &self,
197 _request: Request<NewAddressRequest>,
198 ) -> Result<Response<NewAddressResponse>, Status> {
199 tracing::info!("Request for new wallet address");
200 let address = self.node.wallet.new_external_address().unwrap().to_string();
201 let response = NewAddressResponse { address };
202 Ok(Response::new(response))
203 }
204
205 #[tracing::instrument(skip(self, _request), name = "grpc_server")]
206 async fn list_offers(
207 &self,
208 _request: Request<ListOffersRequest>,
209 ) -> Result<Response<ListOffersResponse>, Status> {
210 tracing::info!("Request for offers to the node.");
211 let offers = self.node.storage.get_contract_offers().unwrap();
212 let offers: Vec<Vec<u8>> = offers
213 .iter()
214 .map(|offer| serde_json::to_vec(offer).unwrap())
215 .collect();
216
217 Ok(Response::new(ListOffersResponse { offers }))
218 }
219
220 #[tracing::instrument(skip(self, _request), name = "grpc_server")]
221 async fn wallet_balance(
222 &self,
223 _request: Request<WalletBalanceRequest>,
224 ) -> Result<Response<WalletBalanceResponse>, Status> {
225 tracing::info!("Request for wallet balance.");
226 let wallet_balance = self.node.balance().unwrap();
227
228 let response = WalletBalanceResponse {
229 confirmed: wallet_balance.confirmed.to_sat(),
230 foreign_unconfirmed: wallet_balance.foreign_unconfirmed.to_sat(),
231 change_unconfirmed: wallet_balance.change_unconfirmed.to_sat(),
232 contract_balance: wallet_balance.contract_pnl,
233 };
234 Ok(Response::new(response))
235 }
236
237 #[tracing::instrument(skip(self, _request), name = "grpc_server")]
238 async fn get_wallet_transactions(
239 &self,
240 _request: Request<GetWalletTransactionsRequest>,
241 ) -> Result<Response<GetWalletTransactionsResponse>, Status> {
242 tracing::info!("Request for all wallet transactions.");
243 let wallet_transactions = self.node.wallet.get_transactions().unwrap();
244 let transactions: Vec<Vec<u8>> = wallet_transactions
245 .iter()
246 .map(|t| serde_json::to_vec(&t).unwrap())
247 .collect();
248 Ok(Response::new(GetWalletTransactionsResponse {
249 transactions,
250 }))
251 }
252
253 #[tracing::instrument(skip(self, _request), name = "grpc_server")]
254 async fn list_utxos(
255 &self,
256 _request: Request<ListUtxosRequest>,
257 ) -> Result<Response<ListUtxosResponse>, Status> {
258 tracing::info!("Request to list all wallet utxos");
259 let utxos = self.node.wallet.list_utxos().unwrap();
260 let utxos: Vec<Vec<u8>> = utxos
261 .iter()
262 .map(|utxo| serde_json::to_vec(utxo).unwrap())
263 .collect();
264 Ok(Response::new(ListUtxosResponse { utxos }))
265 }
266
267 #[tracing::instrument(skip(self, _request), name = "grpc_server")]
268 async fn list_peers(
269 &self,
270 _request: Request<ListPeersRequest>,
271 ) -> Result<Response<ListPeersResponse>, Status> {
272 tracing::info!("List peers request");
273 let peers = self.node.transport.peer_manager.list_peers();
274 let peers = peers
275 .iter()
276 .map(|peer| {
277 let host = match &peer.socket_address {
278 Some(h) => h.to_string(),
279 None => "".to_string(),
280 };
281 let pubkey = peer.counterparty_node_id.to_string();
282 Peer { pubkey, host }
283 })
284 .collect::<Vec<Peer>>();
285
286 Ok(Response::new(ListPeersResponse { peers }))
287 }
288
289 #[tracing::instrument(skip(self, request), name = "grpc_server")]
290 async fn connect_peer(
291 &self,
292 request: Request<ConnectRequest>,
293 ) -> Result<Response<ConnectResponse>, Status> {
294 let ConnectRequest { pubkey, host } = request.into_inner();
295 let pubkey = PublicKey::from_str(&pubkey).unwrap();
296 self.node.transport.connect_outbound(pubkey, &host).await;
297 Ok(Response::new(ConnectResponse {}))
298 }
299
300 async fn list_oracles(
301 &self,
302 _request: Request<ListOraclesRequest>,
303 ) -> Result<Response<ListOraclesResponse>, Status> {
304 let pubkey = self.node.oracle.get_pubkey().await.unwrap().to_string();
305 let name = self.node.oracle.name();
306 Ok(Response::new(ListOraclesResponse { name, pubkey }))
307 }
308
309 async fn list_contracts(
310 &self,
311 _request: Request<ListContractsRequest>,
312 ) -> Result<Response<ListContractsResponse>, Status> {
313 let contracts = self
314 .node
315 .storage
316 .get_contracts()
317 .map_err(|e| Status::new(Code::Cancelled, e.to_string()))?;
318 let contract_bytes: Vec<Vec<u8>> = contracts
319 .iter()
320 .map(|contract| serialize_contract(contract).unwrap())
321 .collect();
322 Ok(Response::new(ListContractsResponse {
323 contracts: contract_bytes,
324 }))
325 }
326
327 async fn send(&self, request: Request<SendRequest>) -> Result<Response<SendResponse>, Status> {
328 let SendRequest {
329 address,
330 amount,
331 fee_rate,
332 } = request.into_inner();
333 let address = Address::from_str(&address).unwrap().assume_checked();
334 let amount = Amount::from_sat(amount);
335 let fee_rate = match FeeRate::from_sat_per_vb(fee_rate) {
336 Some(f) => f,
337 None => return Err(Status::new(Code::InvalidArgument, "Invalid fee rate.")),
338 };
339 let txn = self
340 .node
341 .wallet
342 .send_to_address(address, amount, fee_rate)
343 .await;
344 if let Ok(tx) = txn {
345 Ok(Response::new(SendResponse {
346 txid: tx.to_string(),
347 }))
348 } else {
349 Err(Status::new(Code::Internal, "Transaction sending failed."))
350 }
351 }
352
353 async fn oracle_announcements(
354 &self,
355 _request: Request<OracleAnnouncementsRequest>,
356 ) -> Result<Response<OracleAnnouncementsResponse>, Status> {
357 let announcements: Vec<Vec<u8>> = self
358 .node
359 .storage
360 .get_marketplace_announcements()
361 .unwrap()
363 .iter()
364 .map(|ann| serde_json::to_vec(ann).unwrap())
365 .collect();
366 Ok(Response::new(OracleAnnouncementsResponse { announcements }))
367 }
368
369 async fn wallet_sync(
370 &self,
371 _request: Request<WalletSyncRequest>,
372 ) -> Result<Response<WalletSyncResponse>, Status> {
373 self.node
374 .wallet
375 .sync()
376 .await
377 .map_err(|_| Status::new(Code::Aborted, "Did not sync wallet."))?;
378 Ok(Response::new(WalletSyncResponse {}))
379 }
380}