1use super::{
4 Blockchain, CachedContractSignerProvider, ContractSigner, Oracle, Storage, Time, Wallet,
5};
6use crate::chain_monitor::{ChainMonitor, ChannelInfo, RevokedTxType, TxType};
7use crate::channel::offered_channel::OfferedChannel;
8use crate::channel::signed_channel::{SignedChannel, SignedChannelState, SignedChannelStateType};
9use crate::channel::{Channel, ClosedChannel, ClosedPunishedChannel};
10use crate::channel_updater::get_signed_channel_state;
11use crate::channel_updater::verify_signed_channel;
12use crate::contract::{
13 accepted_contract::AcceptedContract, contract_info::ContractInfo,
14 contract_input::ContractInput, contract_input::OracleInput, offered_contract::OfferedContract,
15 signed_contract::SignedContract, AdaptorInfo, ClosedContract, Contract, FailedAcceptContract,
16 FailedSignContract, PreClosedContract,
17};
18use crate::contract_updater::{accept_contract, verify_accepted_and_sign_contract};
19use crate::error::Error;
20use crate::utils::get_object_in_state;
21use crate::{ChannelId, ContractId, ContractSignerProvider};
22use bitcoin::absolute::Height;
23use bitcoin::consensus::encode::serialize_hex;
24use bitcoin::consensus::Decodable;
25use bitcoin::hex::DisplayHex;
26use bitcoin::{Address, Amount, SignedAmount};
27use bitcoin::{OutPoint, Transaction};
28use ddk_messages::channel::{
29 AcceptChannel, CollaborativeCloseOffer, OfferChannel, Reject, RenewAccept, RenewConfirm,
30 RenewFinalize, RenewOffer, RenewRevoke, SettleAccept, SettleConfirm, SettleFinalize,
31 SettleOffer, SignChannel,
32};
33use ddk_messages::oracle_msgs::{OracleAnnouncement, OracleAttestation};
34use ddk_messages::{AcceptDlc, CloseDlc, Message as DlcMessage, OfferDlc, SignDlc};
35use futures::stream;
36use futures::stream::FuturesUnordered;
37use futures::{StreamExt, TryStreamExt};
38use lightning::chain::chaininterface::FeeEstimator;
39use lightning::ln::chan_utils::{
40 build_commitment_secret, derive_private_key, derive_private_revocation_key,
41};
42use lightning::util::logger::Logger;
43use lightning::{log_debug, log_error, log_info, log_trace, log_warn};
44use once_cell::sync::Lazy;
45use secp256k1_zkp::XOnlyPublicKey;
46use secp256k1_zkp::{
47 ecdsa::Signature, All, EcdsaAdaptorSignature, PublicKey, Secp256k1, SecretKey,
48};
49use std::collections::HashMap;
50use std::ops::Deref;
51use std::string::ToString;
52use std::sync::Arc;
53use tokio::sync::Mutex;
54
55pub const DEFAULT_NB_CONFIRMATIONS: u32 = 3;
57
58static NB_CONFIRMATIONS: Lazy<u32> = Lazy::new(|| match std::env::var("NB_CONFIRMATIONS") {
61 Ok(val) => val.parse().unwrap_or(DEFAULT_NB_CONFIRMATIONS),
62 Err(_) => DEFAULT_NB_CONFIRMATIONS,
63});
64
65static AUTOMATIC_REFUND: Lazy<bool> = Lazy::new(|| match std::env::var("AUTOMATIC_REFUND") {
67 Ok(val) => val.parse().unwrap_or(true),
68 Err(_) => true,
69});
70
71pub const REFUND_DELAY: u32 = 86400 * 7;
73pub const CET_NSEQUENCE: u32 = 288;
75pub const PEER_TIMEOUT: u64 = 3600;
78
79type ClosableContractInfo<'a> = Option<(
80 &'a ContractInfo,
81 &'a AdaptorInfo,
82 Vec<(usize, OracleAttestation)>,
83)>;
84
85#[derive(Debug)]
87pub struct Manager<
88 W: Deref,
89 SP: Deref,
90 B: Deref,
91 S: Deref,
92 O: Deref,
93 T: Deref,
94 F: Deref,
95 X: ContractSigner,
96 L: Deref,
97> where
98 W::Target: Wallet,
99 SP::Target: ContractSignerProvider<Signer = X>,
100 B::Target: Blockchain,
101 S::Target: Storage,
102 O::Target: Oracle,
103 T::Target: Time,
104 F::Target: FeeEstimator,
105 L::Target: Logger,
106{
107 oracles: HashMap<XOnlyPublicKey, O>,
108 wallet: W,
109 signer_provider: SP,
110 blockchain: B,
111 store: S,
112 secp: Secp256k1<All>,
113 chain_monitor: Mutex<ChainMonitor>,
114 time: T,
115 fee_estimator: F,
116 logger: L,
117}
118
119macro_rules! get_contract_in_state {
120 ($manager: ident, $contract_id: expr, $state: ident, $peer_id: expr) => {{
121 get_object_in_state!(
122 $manager,
123 $contract_id,
124 $state,
125 $peer_id,
126 Contract,
127 get_contract
128 )
129 }};
130}
131
132macro_rules! get_channel_in_state {
133 ($manager: ident, $channel_id: expr, $state: ident, $peer_id: expr) => {{
134 get_object_in_state!(
135 $manager,
136 $channel_id,
137 $state,
138 $peer_id,
139 Channel,
140 get_channel
141 )
142 }};
143}
144
145macro_rules! get_signed_channel_rollback_state {
146 ($signed_channel: ident, $state: ident, $($field: ident),*) => {{
147 match $signed_channel.roll_back_state.as_ref() {
148 Some(SignedChannelState::$state{$($field,)* ..}) => Ok(($($field,)*)),
149 _ => Err(Error::InvalidState(format!("Expected rollback state {} got {:?}", stringify!($state), $signed_channel.state))),
150 }
151 }};
152}
153
154macro_rules! check_for_timed_out_channels {
155 ($manager: ident, $state: ident) => {
156 let channels = $manager
157 .store
158 .get_signed_channels(Some(SignedChannelStateType::$state))
159 .await?;
160
161 for channel in channels {
162 if let SignedChannelState::$state { timeout, .. } = channel.state {
163 let is_timed_out = timeout < $manager.time.unix_time_now();
164 if is_timed_out {
165 match $manager.force_close_channel_internal(channel, true).await {
166 Err(e) => {
167 log_error!($manager.logger, "Error force closing channel. error={}", e)
168 }
169 _ => {}
170 }
171 }
172 }
173 }
174 };
175}
176
177impl<
178 W: Deref,
179 SP: Deref,
180 B: Deref,
181 S: Deref,
182 O: Deref,
183 T: Deref,
184 F: Deref,
185 X: ContractSigner,
186 L: Deref,
187 > Manager<W, Arc<CachedContractSignerProvider<SP, X>>, B, S, O, T, F, X, L>
188where
189 W::Target: Wallet,
190 SP::Target: ContractSignerProvider<Signer = X>,
191 B::Target: Blockchain,
192 S::Target: Storage,
193 O::Target: Oracle,
194 T::Target: Time,
195 F::Target: FeeEstimator,
196 L::Target: Logger,
197{
198 #[allow(clippy::too_many_arguments)]
200 #[tracing::instrument(skip_all)]
201 pub async fn new(
202 wallet: W,
203 signer_provider: SP,
204 blockchain: B,
205 store: S,
206 oracles: HashMap<XOnlyPublicKey, O>,
207 time: T,
208 fee_estimator: F,
209 logger: L,
210 ) -> Result<Self, Error> {
211 let init_height = blockchain.get_blockchain_height().await?;
212 let chain_monitor = Mutex::new(
213 store
214 .get_chain_monitor()
215 .await?
216 .unwrap_or(ChainMonitor::new(init_height)),
217 );
218
219 let signer_provider = Arc::new(CachedContractSignerProvider::new(signer_provider));
220
221 log_info!(logger, "Manager initialized");
222
223 Ok(Manager {
224 secp: secp256k1_zkp::Secp256k1::new(),
225 wallet,
226 signer_provider,
227 blockchain,
228 store,
229 oracles,
230 time,
231 fee_estimator,
232 chain_monitor,
233 logger,
234 })
235 }
236
237 pub fn get_store(&self) -> &S {
239 &self.store
240 }
241
242 #[tracing::instrument(skip_all)]
244 pub async fn on_dlc_message(
245 &self,
246 msg: &DlcMessage,
247 counter_party: PublicKey,
248 ) -> Result<Option<DlcMessage>, Error> {
249 match msg {
250 DlcMessage::Offer(o) => {
251 log_debug!(self.logger, "Received offer message");
252 self.on_offer_message(o, counter_party).await?;
253 Ok(None)
254 }
255 DlcMessage::Accept(a) => {
256 log_debug!(self.logger, "Received accept message");
257 Ok(Some(self.on_accept_message(a, &counter_party).await?))
258 }
259 DlcMessage::Sign(s) => {
260 log_debug!(self.logger, "Received sign message");
261 self.on_sign_message(s, &counter_party).await?;
262 Ok(None)
263 }
264 DlcMessage::Close(c) => {
265 log_debug!(self.logger, "Received close message");
266 self.on_close_message(c, &counter_party).await?;
267 Ok(None)
268 }
269 DlcMessage::OfferChannel(o) => {
270 log_debug!(self.logger, "Received offer channel message");
271 self.on_offer_channel(o, counter_party).await?;
272 Ok(None)
273 }
274 DlcMessage::AcceptChannel(a) => {
275 log_debug!(self.logger, "Received accept channel message");
276 Ok(Some(DlcMessage::SignChannel(
277 self.on_accept_channel(a, &counter_party).await?,
278 )))
279 }
280 DlcMessage::SignChannel(s) => {
281 log_debug!(self.logger, "Received sign channel message");
282 self.on_sign_channel(s, &counter_party).await?;
283 Ok(None)
284 }
285 DlcMessage::SettleOffer(s) => {
286 log_debug!(self.logger, "Received settle offer message");
287 match self.on_settle_offer(s, &counter_party).await? {
288 Some(msg) => Ok(Some(DlcMessage::Reject(msg))),
289 None => Ok(None),
290 }
291 }
292 DlcMessage::SettleAccept(s) => {
293 log_debug!(self.logger, "Received settle accept message");
294 Ok(Some(DlcMessage::SettleConfirm(
295 self.on_settle_accept(s, &counter_party).await?,
296 )))
297 }
298 DlcMessage::SettleConfirm(s) => {
299 log_debug!(self.logger, "Received settle confirm message");
300 Ok(Some(DlcMessage::SettleFinalize(
301 self.on_settle_confirm(s, &counter_party).await?,
302 )))
303 }
304 DlcMessage::SettleFinalize(s) => {
305 log_debug!(self.logger, "Received settle finalize message");
306 self.on_settle_finalize(s, &counter_party).await?;
307 Ok(None)
308 }
309 DlcMessage::RenewOffer(r) => {
310 log_debug!(self.logger, "Received renew offer message");
311 match self.on_renew_offer(r, &counter_party).await? {
312 Some(msg) => Ok(Some(DlcMessage::Reject(msg))),
313 None => Ok(None),
314 }
315 }
316 DlcMessage::RenewAccept(r) => {
317 log_debug!(self.logger, "Received renew accept message");
318 Ok(Some(DlcMessage::RenewConfirm(
319 self.on_renew_accept(r, &counter_party).await?,
320 )))
321 }
322 DlcMessage::RenewConfirm(r) => {
323 log_debug!(self.logger, "Received renew confirm message");
324 Ok(Some(DlcMessage::RenewFinalize(
325 self.on_renew_confirm(r, &counter_party).await?,
326 )))
327 }
328 DlcMessage::RenewFinalize(r) => {
329 log_debug!(self.logger, "Received renew finalize message");
330 let revoke = self.on_renew_finalize(r, &counter_party).await?;
331 Ok(Some(DlcMessage::RenewRevoke(revoke)))
332 }
333 DlcMessage::RenewRevoke(r) => {
334 log_debug!(self.logger, "Received renew revoke message");
335 self.on_renew_revoke(r, &counter_party).await?;
336 Ok(None)
337 }
338 DlcMessage::CollaborativeCloseOffer(c) => {
339 log_debug!(self.logger, "Received collaborative close offer message");
340 self.on_collaborative_close_offer(c, &counter_party).await?;
341 Ok(None)
342 }
343 DlcMessage::Reject(r) => {
344 log_debug!(self.logger, "Received reject message");
345 self.on_reject(r, &counter_party).await?;
346 Ok(None)
347 }
348 }
349 }
350
351 #[tracing::instrument(skip_all)]
353 pub async fn send_splice_offer(
354 &self,
355 contract_input: &ContractInput,
356 counter_party: PublicKey,
357 contract_id: &ContractId,
358 ) -> Result<OfferDlc, Error> {
359 let oracle_announcements = self.oracle_announcements(contract_input).await?;
360
361 self.send_splice_offer_with_announcements(
362 contract_input,
363 counter_party,
364 contract_id,
365 oracle_announcements,
366 )
367 .await
368 }
369
370 #[tracing::instrument(skip_all)]
373 pub async fn send_splice_offer_with_announcements(
374 &self,
375 contract_input: &ContractInput,
376 counter_party: PublicKey,
377 contract_id: &ContractId,
378 oracle_announcements: Vec<Vec<OracleAnnouncement>>,
379 ) -> Result<OfferDlc, Error> {
380 let confirmed_contract =
381 get_contract_in_state!(self, contract_id, Confirmed, Some(counter_party))?;
382
383 let dlc_input = confirmed_contract.get_dlc_input();
384
385 let (offered_contract, offer_msg) = crate::contract_updater::offer_contract(
386 &self.secp,
387 contract_input,
388 oracle_announcements,
389 vec![dlc_input],
390 REFUND_DELAY,
391 &counter_party,
392 &self.wallet,
393 &self.blockchain,
394 &self.time,
395 &self.signer_provider,
396 &self.logger,
397 )
398 .await?;
399
400 offered_contract.validate()?;
401
402 self.store.create_contract(&offered_contract).await?;
403
404 Ok(offer_msg)
405 }
406
407 #[tracing::instrument(skip_all)]
412 pub async fn send_offer(
413 &self,
414 contract_input: &ContractInput,
415 counter_party: PublicKey,
416 ) -> Result<OfferDlc, Error> {
417 let oracle_announcements = self.oracle_announcements(contract_input).await?;
419
420 self.send_offer_with_announcements(contract_input, counter_party, oracle_announcements)
421 .await
422 }
423
424 #[tracing::instrument(skip_all)]
430 pub async fn send_offer_with_announcements(
431 &self,
432 contract_input: &ContractInput,
433 counter_party: PublicKey,
434 oracle_announcements: Vec<Vec<OracleAnnouncement>>,
435 ) -> Result<OfferDlc, Error> {
436 let (offered_contract, offer_msg) = crate::contract_updater::offer_contract(
437 &self.secp,
438 contract_input,
439 oracle_announcements,
440 vec![],
441 REFUND_DELAY,
442 &counter_party,
443 &self.wallet,
444 &self.blockchain,
445 &self.time,
446 &self.signer_provider,
447 &self.logger,
448 )
449 .await?;
450
451 offered_contract.validate()?;
452
453 self.store.create_contract(&offered_contract).await?;
454
455 Ok(offer_msg)
456 }
457
458 #[tracing::instrument(skip_all)]
460 pub async fn accept_contract_offer(
461 &self,
462 contract_id: &ContractId,
463 ) -> Result<(ContractId, PublicKey, AcceptDlc), Error> {
464 let offered_contract =
465 get_contract_in_state!(self, contract_id, Offered, None as Option<PublicKey>)?;
466
467 let counter_party = offered_contract.counter_party;
468
469 let (accepted_contract, accept_msg) = accept_contract(
470 &self.secp,
471 &offered_contract,
472 &self.wallet,
473 &self.signer_provider,
474 &self.blockchain,
475 &self.logger,
476 )
477 .await?;
478
479 self.wallet.import_address(&Address::p2wsh(
480 &accepted_contract.dlc_transactions.funding_script_pubkey,
481 self.blockchain.get_network()?,
482 ))?;
483
484 let contract_id = accepted_contract.get_contract_id();
485
486 self.store
487 .update_contract(&Contract::Accepted(accepted_contract))
488 .await?;
489
490 log_info!(
491 self.logger,
492 "Accepted and stored the contract. temp_id={} contract_id={}",
493 offered_contract.id.to_lower_hex_string(),
494 contract_id.to_lower_hex_string(),
495 );
496
497 Ok((contract_id, counter_party, accept_msg))
498 }
499
500 #[tracing::instrument(skip_all)]
506 pub async fn periodic_chain_monitor(&self) -> Result<(), Error> {
507 let cur_height = self.blockchain.get_blockchain_height().await?;
508 let last_height = self.chain_monitor.lock().await.last_height;
509
510 if cur_height < last_height {
513 return Err(Error::InvalidState(
514 "Current height is lower than last height.".to_string(),
515 ));
516 }
517
518 for height in last_height + 1..=cur_height {
519 let block = self.blockchain.get_block_at_height(height).await?;
520
521 self.chain_monitor
522 .lock()
523 .await
524 .process_block(&block, height);
525 }
526
527 Ok(())
528 }
529
530 #[tracing::instrument(skip_all, level = "debug")]
533 pub async fn periodic_check(&self, check_channels: bool) -> Result<(), Error> {
534 let signed_contracts = self.check_for_spliced_contract().await?;
535 self.check_signed_contracts(&signed_contracts).await?;
536 self.check_confirmed_contracts().await?;
537 self.check_preclosed_contracts().await?;
538
539 if check_channels {
540 self.channel_checks().await?;
541 }
542
543 Ok(())
544 }
545
546 #[tracing::instrument(skip_all)]
548 pub async fn on_offer_message(
549 &self,
550 offered_message: &OfferDlc,
551 counter_party: PublicKey,
552 ) -> Result<(), Error> {
553 offered_message.validate(&self.secp, REFUND_DELAY, REFUND_DELAY * 2)?;
554 let keys_id = self
555 .signer_provider
556 .derive_signer_key_id(false, offered_message.temporary_contract_id);
557 let contract: OfferedContract =
558 OfferedContract::try_from_offer_dlc(offered_message, counter_party, keys_id)?;
559 contract.validate()?;
560
561 if self.store.get_contract(&contract.id).await?.is_some() {
562 return Err(Error::InvalidParameters(
563 "Contract with identical id already exists".to_string(),
564 ));
565 }
566
567 self.store.create_contract(&contract).await?;
568 log_info!(
569 self.logger,
570 "Created and stored the offered contract. temp_id={}",
571 contract.id.to_lower_hex_string(),
572 );
573 Ok(())
574 }
575
576 #[tracing::instrument(skip_all)]
578 pub async fn on_close_message(
579 &self,
580 close_msg: &CloseDlc,
581 counter_party: &PublicKey,
582 ) -> Result<(), Error> {
583 let signed_contract = get_contract_in_state!(
585 self,
586 &close_msg.contract_id,
587 Confirmed,
588 Some(*counter_party)
589 )?;
590
591 let _close_tx = crate::contract_updater::complete_cooperative_close(
594 &self.secp,
595 &signed_contract,
596 close_msg,
597 &self.signer_provider,
598 &self.logger,
599 )?;
600
601 Ok(())
604 }
605
606 #[tracing::instrument(skip_all)]
608 pub async fn on_accept_message(
609 &self,
610 accept_msg: &AcceptDlc,
611 counter_party: &PublicKey,
612 ) -> Result<DlcMessage, Error> {
613 let offered_contract = get_contract_in_state!(
614 self,
615 &accept_msg.temporary_contract_id,
616 Offered,
617 Some(*counter_party)
618 )?;
619
620 let (signed_contract, signed_msg) = match verify_accepted_and_sign_contract(
621 &self.secp,
622 &offered_contract,
623 accept_msg,
624 &self.wallet,
625 &self.signer_provider,
626 &self.store,
627 &self.logger,
628 )
629 .await
630 {
631 Ok(contract) => contract,
632 Err(e) => {
633 log_error!(
634 self.logger,
635 "Error in on_accept_message. tmp_contract_id={} error={}",
636 offered_contract.id.to_lower_hex_string(),
637 e.to_string()
638 );
639 return self
640 .accept_fail_on_error(offered_contract, accept_msg.clone(), e)
641 .await;
642 }
643 };
644
645 log_info!(
646 self.logger,
647 "Verified the accept message and signed the contract. temp_id={} contract_id={}",
648 offered_contract.id.to_lower_hex_string(),
649 signed_contract.accepted_contract.get_contract_id_string(),
650 );
651
652 let contract_id = signed_contract.accepted_contract.get_contract_id_string();
653
654 self.wallet.import_address(&Address::p2wsh(
655 &signed_contract
656 .accepted_contract
657 .dlc_transactions
658 .funding_script_pubkey,
659 self.blockchain.get_network()?,
660 ))?;
661
662 self.store
663 .update_contract(&Contract::Signed(signed_contract))
664 .await?;
665
666 log_info!(
667 self.logger,
668 "Accepted and signed the contract. temp_id={} contract_id={}",
669 offered_contract.id.to_lower_hex_string(),
670 contract_id,
671 );
672
673 Ok(DlcMessage::Sign(signed_msg))
674 }
675
676 #[tracing::instrument(skip_all)]
678 pub async fn on_sign_message(
679 &self,
680 sign_message: &SignDlc,
681 peer_id: &PublicKey,
682 ) -> Result<(), Error> {
683 let accepted_contract =
684 get_contract_in_state!(self, &sign_message.contract_id, Accepted, Some(*peer_id))?;
685 let (signed_contract, fund_tx) = match crate::contract_updater::verify_signed_contract(
686 &self.secp,
687 &accepted_contract,
688 sign_message,
689 &self.wallet,
690 &self.store,
691 &self.signer_provider,
692 &self.logger,
693 )
694 .await
695 {
696 Ok(contract) => contract,
697 Err(e) => {
698 log_error!(
699 self.logger,
700 "Error in on_sign_message. contract_id={} error={}",
701 accepted_contract.get_contract_id_string(),
702 e.to_string()
703 );
704 return self
705 .sign_fail_on_error(accepted_contract, sign_message.clone(), e)
706 .await;
707 }
708 };
709
710 self.store
711 .update_contract(&Contract::Signed(signed_contract.clone()))
712 .await?;
713
714 self.blockchain.send_transaction(&fund_tx).await?;
715
716 if accepted_contract
719 .offered_contract
720 .funding_inputs
721 .iter()
722 .any(|c| c.dlc_input.is_some())
723 {
724 let Some(dlc_input) = accepted_contract
725 .offered_contract
726 .funding_inputs
727 .iter()
728 .find(|c| c.dlc_input.is_some())
729 else {
730 return Ok(());
731 };
732 let preclosed_contract = get_contract_in_state!(
733 self,
734 &dlc_input.dlc_input.as_ref().unwrap().contract_id,
735 Confirmed,
736 None as Option<PublicKey>
737 )?;
738
739 let preclosed_contract = PreClosedContract {
740 signed_contract: preclosed_contract.clone(),
741 attestations: None,
742 signed_cet: signed_contract
743 .accepted_contract
744 .dlc_transactions
745 .fund
746 .clone(),
747 };
748
749 log_debug!(self.logger,
750 "Contract contains a DLC input. Marking the previous contract as pre-closed. dlc_input_contract_id={}",
751 preclosed_contract.signed_contract.accepted_contract.get_contract_id_string()
752 );
753
754 self.store
755 .update_contract(&Contract::PreClosed(preclosed_contract.clone()))
756 .await?;
757 }
758
759 Ok(())
760 }
761
762 #[tracing::instrument(skip_all, level = "debug")]
763 async fn get_oracle_announcements(
764 &self,
765 oracle_inputs: &OracleInput,
766 ) -> Result<Vec<OracleAnnouncement>, Error> {
767 let mut announcements = Vec::new();
768 for pubkey in &oracle_inputs.public_keys {
769 let oracle = self
770 .oracles
771 .get(pubkey)
772 .ok_or_else(|| Error::InvalidParameters("Unknown oracle public key".to_string()))?;
773 let announcement = oracle.get_announcement(&oracle_inputs.event_id).await?;
774 announcements.push(announcement);
775 }
776
777 Ok(announcements)
778 }
779
780 async fn sign_fail_on_error<R>(
781 &self,
782 accepted_contract: AcceptedContract,
783 sign_message: SignDlc,
784 e: Error,
785 ) -> Result<R, Error> {
786 log_error!(
787 self.logger,
788 "Error in on_sign_message marking contract as failed sign. contract_id={} error={}",
789 accepted_contract.get_contract_id_string(),
790 e.to_string()
791 );
792 self.store
793 .update_contract(&Contract::FailedSign(FailedSignContract {
794 accepted_contract,
795 sign_message,
796 error_message: e.to_string(),
797 }))
798 .await?;
799 Err(e)
800 }
801
802 async fn accept_fail_on_error<R>(
803 &self,
804 offered_contract: OfferedContract,
805 accept_message: AcceptDlc,
806 e: Error,
807 ) -> Result<R, Error> {
808 log_error!(
809 self.logger,
810 "Error in on_accept_message marking contract as failed accept. contract_id={} error={}",
811 offered_contract.id.to_lower_hex_string(),
812 e.to_string()
813 );
814 self.store
815 .update_contract(&Contract::FailedAccept(FailedAcceptContract {
816 offered_contract,
817 accept_message,
818 error_message: e.to_string(),
819 }))
820 .await?;
821 Err(e)
822 }
823
824 #[tracing::instrument(skip_all, level = "debug")]
825 async fn check_signed_contract(&self, contract: &SignedContract) -> Result<(), Error> {
826 let confirmations = self
827 .blockchain
828 .get_transaction_confirmations(
829 &contract
830 .accepted_contract
831 .dlc_transactions
832 .fund
833 .compute_txid(),
834 )
835 .await?;
836 if confirmations >= *NB_CONFIRMATIONS {
837 log_info!(
838 self.logger,
839 "Marking signed contract as confirmed. confirmations={} contract_id={}",
840 confirmations,
841 contract.accepted_contract.get_contract_id_string(),
842 );
843 self.store
844 .update_contract(&Contract::Confirmed(contract.clone()))
845 .await?;
846 } else {
847 log_debug!(self.logger,
848 "Not enough confirmations to mark contract as confirmed. confirmations={} required={} contract_id={}",
849 confirmations,
850 *NB_CONFIRMATIONS,
851 contract.accepted_contract.get_contract_id_string(),
852 );
853 }
854 Ok(())
855 }
856
857 #[tracing::instrument(skip_all, level = "debug")]
858 async fn check_signed_contracts(
859 &self,
860 signed_contracts: &[SignedContract],
861 ) -> Result<(), Error> {
862 for c in signed_contracts {
863 if let Err(e) = self.check_signed_contract(c).await {
864 log_error!(
865 self.logger,
866 "Error checking signed contract. contract_id={} error={}",
867 c.accepted_contract.get_contract_id_string(),
868 e.to_string()
869 )
870 }
871 }
872
873 Ok(())
874 }
875
876 #[tracing::instrument(skip_all, level = "debug")]
877 async fn check_for_spliced_contract(&self) -> Result<Vec<SignedContract>, Error> {
878 let contracts = self.get_store().get_signed_contracts().await?;
879 for contract in &contracts {
880 let dlc_input = contract
881 .accepted_contract
882 .offered_contract
883 .funding_inputs
884 .iter()
885 .find(|d| d.dlc_input.is_some());
886
887 let Some(funding_input) = dlc_input else {
888 continue;
889 };
890
891 let contract_id = funding_input.dlc_input.as_ref().unwrap().contract_id;
892
893 let confirmed_contract_in_splice = match get_contract_in_state!(
894 self,
895 &contract_id,
896 Confirmed,
897 None as Option<PublicKey>
898 ) {
899 Ok(c) => Ok(c),
900 Err(Error::InvalidState(e)) => {
901 log_trace!(self.logger,
902 "The previouse contract referenced in a splice transaction is in an unexpected state. contract_id={} error={}",
903 contract_id.to_lower_hex_string(), e.to_string(),
904 );
905 continue;
906 }
907 Err(e) => {
908 log_debug!(self.logger,
909 "The previouse contract referenced in a splice transaction failed to retrieve. contract_id={} error={}",
910 contract_id.to_lower_hex_string(), e.to_string(),
911 );
912 Err(e)
913 }
914 }?;
915
916 let preclosed_contract = PreClosedContract {
917 signed_contract: confirmed_contract_in_splice.clone(),
918 attestations: None,
919 signed_cet: contract.accepted_contract.dlc_transactions.fund.clone(),
920 };
921
922 log_debug!(self.logger,
923 "The previous contract that was spliced is now closed because the splice contract is confirmed. contract_id={}",
924 contract_id.to_lower_hex_string(),
925 );
926
927 self.store
928 .update_contract(&Contract::PreClosed(preclosed_contract.clone()))
929 .await?;
930 }
931 Ok(contracts)
932 }
933
934 #[tracing::instrument(skip_all, level = "debug")]
935 async fn check_confirmed_contracts(&self) -> Result<(), Error> {
936 for c in self.store.get_confirmed_contracts().await? {
937 if c.channel_id.is_some() {
939 continue;
940 }
941 if let Err(e) = self.check_confirmed_contract(&c).await {
942 log_error!(
943 self.logger,
944 "Error checking confirmed contract. contract_id={} error={}",
945 c.accepted_contract.get_contract_id_string(),
946 e.to_string()
947 )
948 }
949 }
950
951 Ok(())
952 }
953
954 #[tracing::instrument(skip_all, level = "debug")]
955 async fn get_closable_contract_info<'a>(
956 &'a self,
957 contract: &'a SignedContract,
958 ) -> ClosableContractInfo<'a> {
959 let contract_infos = &contract.accepted_contract.offered_contract.contract_info;
960 let adaptor_infos = &contract.accepted_contract.adaptor_infos;
961 for (contract_info, adaptor_info) in contract_infos.iter().zip(adaptor_infos.iter()) {
962 log_debug!(
963 self.logger,
964 "Checking contract for oracle maturation. contract_id={}",
965 contract.accepted_contract.get_contract_id_string()
966 );
967 let matured: Vec<_> = contract_info
968 .oracle_announcements
969 .iter()
970 .filter(|x| {
971 (x.oracle_event.event_maturity_epoch as u64) <= self.time.unix_time_now()
972 })
973 .enumerate()
974 .collect();
975 if matured.len() >= contract_info.threshold {
976 let attestations = stream::iter(matured.iter())
977 .map(|(i, announcement)| async move {
978 log_debug!(self.logger,
979 "Oracle announcement for contract is matured. Getting attestations. contract_id={} event_id={}",
980 contract.accepted_contract.get_contract_id_string(),
981 announcement.oracle_event.event_id
982 );
983 let oracle = match self.oracles.get(&announcement.oracle_public_key) {
985 Some(oracle) => oracle,
986 None => {
987 log_debug!(self.logger,
988 "Oracle not found. pubkey={}. contract_id={} event_id={}",
989 announcement.oracle_public_key,
990 contract.accepted_contract.get_contract_id_string(),
991 announcement.oracle_event.event_id
992 );
993 return None;
994 }
995 };
996 let attestation = match oracle
998 .get_attestation(&announcement.oracle_event.event_id)
999 .await
1000 {
1001 Ok(attestation) => attestation,
1002 Err(_) => {
1003 return None;
1010 }
1011 };
1012 if let Err(e) = attestation.validate(&self.secp, announcement) {
1014 log_error!(self.logger,
1015 "Oracle attestation is not valid. pubkey={} event_id={}, error={}",
1016 announcement.oracle_public_key,
1017 announcement.oracle_event.event_id,
1018 e.to_string()
1019 );
1020 return None;
1021 }
1022 log_info!(self.logger,
1023 "Retrieved a valid attestation. pubkey={} event_id={} outcomes={:?}",
1024 announcement.oracle_public_key,
1025 announcement.oracle_event.event_id,
1026 attestation.outcomes
1027 );
1028 Some((*i, attestation))
1029 })
1030 .collect::<FuturesUnordered<_>>()
1031 .await
1032 .filter_map(|result| async move { result }) .collect::<Vec<_>>()
1034 .await;
1035 if attestations.len() >= contract_info.threshold {
1036 log_info!(self.logger,
1037 "Found enough attestations to close contract. contract_id={} attestations={}",
1038 contract.accepted_contract.get_contract_id_string(),
1039 attestations.len()
1040 );
1041 return Some((contract_info, adaptor_info, attestations));
1042 }
1043 }
1044 }
1045 None
1046 }
1047
1048 #[tracing::instrument(skip_all, level = "debug")]
1049 async fn check_confirmed_contract(&self, contract: &SignedContract) -> Result<(), Error> {
1050 let closable_contract_info = self.get_closable_contract_info(contract).await;
1056 if let Some((contract_info, adaptor_info, attestations)) = closable_contract_info {
1057 log_debug!(
1058 self.logger,
1059 "Found closable contract info. contract_id={} attestations={}",
1060 contract.accepted_contract.get_contract_id_string(),
1061 attestations.len()
1062 );
1063 let offer = &contract.accepted_contract.offered_contract;
1064 let signer = self.signer_provider.derive_contract_signer(offer.keys_id)?;
1065
1066 if let Ok(cet) = crate::contract_updater::get_signed_cet(
1072 &self.secp,
1073 contract,
1074 contract_info,
1075 adaptor_info,
1076 &attestations,
1077 &signer,
1078 &self.logger,
1079 ) {
1080 log_info!(
1081 self.logger,
1082 "Found valid CET. Closing contract. contract_id={}",
1083 contract.accepted_contract.get_contract_id_string()
1084 );
1085 match self
1086 .close_contract(
1087 contract,
1088 cet,
1089 attestations.iter().map(|x| x.1.clone()).collect(),
1090 )
1091 .await
1092 {
1093 Ok(closed_contract) => {
1094 log_info!(
1095 self.logger,
1096 "Updated contract to closed. contract_id={}",
1097 contract.accepted_contract.get_contract_id_string()
1098 );
1099 self.store.update_contract(&closed_contract).await?;
1100 return Ok(());
1101 }
1102 Err(e) => {
1103 log_warn!(
1104 self.logger,
1105 "Failed to close contract. contract_id={} error={}",
1106 contract.accepted_contract.get_contract_id_string(),
1107 e.to_string()
1108 );
1109 return Err(e);
1110 }
1111 }
1112 }
1113 }
1114
1115 for pending_close_tx in &contract
1117 .accepted_contract
1118 .dlc_transactions
1119 .pending_close_txs
1120 {
1121 let confirmations = self
1122 .blockchain
1123 .get_transaction_confirmations(&pending_close_tx.compute_txid())
1124 .await?;
1125
1126 log_debug!(
1127 self.logger,
1128 "Checking pending close transaction. close_txid={} contract_id={} confirmations={}",
1129 pending_close_tx.compute_txid().to_string(),
1130 contract.accepted_contract.get_contract_id_string(),
1131 confirmations
1132 );
1133
1134 if confirmations >= *NB_CONFIRMATIONS {
1135 log_info!(self.logger,
1137 "Pending close transaction is fully confirmed. Moving to closed. close_txid={} contract_id={}",
1138 pending_close_tx.compute_txid().to_string(),
1139 contract.accepted_contract.get_contract_id_string()
1140 );
1141 let pnl = contract.accepted_contract.compute_pnl(pending_close_tx);
1142 let closed_contract = ClosedContract {
1143 attestations: None, signed_cet: Some(pending_close_tx.clone()), contract_id: contract.accepted_contract.get_contract_id(),
1146 temporary_contract_id: contract.accepted_contract.offered_contract.id,
1147 counter_party_id: contract.accepted_contract.offered_contract.counter_party,
1148 pnl,
1149 funding_txid: contract
1150 .accepted_contract
1151 .dlc_transactions
1152 .fund
1153 .compute_txid(),
1154 signed_contract: contract.clone(),
1155 };
1156
1157 self.store
1158 .update_contract(&Contract::Closed(closed_contract))
1159 .await?;
1160 break; } else if confirmations >= 1 {
1162 log_debug!(self.logger,
1164 "Found a confirmed but not fully confirmed pending close. Moving to preclosed. close_txid={} contract_id={}",
1165 pending_close_tx.compute_txid().to_string(),
1166 contract.accepted_contract.get_contract_id_string()
1167 );
1168 let preclosed_contract = PreClosedContract {
1169 signed_contract: contract.clone(),
1170 attestations: None, signed_cet: pending_close_tx.clone(),
1172 };
1173
1174 self.store
1175 .update_contract(&Contract::PreClosed(preclosed_contract))
1176 .await?;
1177 break; }
1179 }
1180
1181 self.check_refund(contract).await?;
1182
1183 Ok(())
1184 }
1185
1186 #[tracing::instrument(skip_all, level = "debug")]
1188 pub async fn close_confirmed_contract(
1189 &self,
1190 contract_id: &ContractId,
1191 attestations: Vec<(usize, OracleAttestation)>,
1192 ) -> Result<Contract, Error> {
1193 log_info!(
1194 self.logger,
1195 "Attempting to close confirmed contract manually. contract_id={} outcomes={:?}",
1196 contract_id.to_lower_hex_string(),
1197 attestations
1198 .iter()
1199 .map(|(_, a)| a.outcomes.clone())
1200 .collect::<Vec<_>>()
1201 );
1202 let contract = get_contract_in_state!(self, contract_id, Confirmed, None::<PublicKey>)?;
1203 let contract_infos = &contract.accepted_contract.offered_contract.contract_info;
1204 let adaptor_infos = &contract.accepted_contract.adaptor_infos;
1205
1206 if let Some((contract_info, adaptor_info)) =
1208 contract_infos.iter().zip(adaptor_infos).find(|(c, _)| {
1209 let matches = attestations
1210 .iter()
1211 .filter(|(i, a)| {
1212 c.oracle_announcements[*i].oracle_event.oracle_nonces == a.nonces()
1213 })
1214 .count();
1215
1216 matches >= c.threshold
1217 })
1218 {
1219 let offer = &contract.accepted_contract.offered_contract;
1220 let signer = self.signer_provider.derive_contract_signer(offer.keys_id)?;
1221 log_debug!(
1222 self.logger,
1223 "Getting signed CET. contract_id={}",
1224 contract.accepted_contract.get_contract_id_string()
1225 );
1226 let cet = crate::contract_updater::get_signed_cet(
1227 &self.secp,
1228 &contract,
1229 contract_info,
1230 adaptor_info,
1231 &attestations,
1232 &signer,
1233 &self.logger,
1234 )?;
1235
1236 let time = bitcoin::absolute::Time::from_consensus(self.time.unix_time_now() as u32)
1238 .expect("Time is not in valid range. This should never happen.");
1239 let height =
1240 Height::from_consensus(self.blockchain.get_blockchain_height().await? as u32)
1241 .expect("Height is not in valid range. This should never happen.");
1242 let locktime = cet.lock_time;
1243
1244 if !locktime.is_satisfied_by(height, time) {
1245 return Err(Error::InvalidState(
1246 "CET lock time has not passed yet".to_string(),
1247 ));
1248 }
1249
1250 match self
1251 .close_contract(
1252 &contract,
1253 cet,
1254 attestations.into_iter().map(|x| x.1).collect(),
1255 )
1256 .await
1257 {
1258 Ok(closed_contract) => {
1259 log_info!(
1260 self.logger,
1261 "Closed contract manually. contract_id={}",
1262 contract.accepted_contract.get_contract_id_string()
1263 );
1264 self.store.update_contract(&closed_contract).await?;
1265 Ok(closed_contract)
1266 }
1267 Err(e) => {
1268 log_error!(
1269 self.logger,
1270 "Failed to close contract. contract_id={} error={}",
1271 contract.accepted_contract.get_contract_id_string(),
1272 e.to_string()
1273 );
1274 Err(e)
1275 }
1276 }
1277 } else {
1278 Err(Error::InvalidState(
1279 "Attestations did not match contract infos".to_string(),
1280 ))
1281 }
1282 }
1283
1284 #[tracing::instrument(skip_all, level = "debug")]
1285 async fn check_preclosed_contracts(&self) -> Result<(), Error> {
1286 for c in self.store.get_preclosed_contracts().await? {
1287 if let Err(e) = self.check_preclosed_contract(&c).await {
1288 log_error!(
1289 self.logger,
1290 "Error checking pre-closed contract. contract_id={} error={}",
1291 c.signed_contract.accepted_contract.get_contract_id_string(),
1292 e
1293 )
1294 }
1295 }
1296
1297 Ok(())
1298 }
1299
1300 #[tracing::instrument(skip_all, level = "debug")]
1301 async fn check_preclosed_contract(&self, contract: &PreClosedContract) -> Result<(), Error> {
1302 let broadcasted_txid = contract.signed_cet.compute_txid();
1303 let confirmations = self
1304 .blockchain
1305 .get_transaction_confirmations(&broadcasted_txid)
1306 .await?;
1307 log_debug!(
1308 self.logger,
1309 "Checking pre-closed contract. broadcasted_txid={} contract_id={} confirmations={}",
1310 broadcasted_txid.to_string(),
1311 contract
1312 .signed_contract
1313 .accepted_contract
1314 .get_contract_id_string(),
1315 confirmations
1316 );
1317 if confirmations >= *NB_CONFIRMATIONS {
1318 log_debug!(self.logger,
1319 "Pre-closed contract is fully confirmed. Moving to closed. broadcasted_txid={} contract_id={}",
1320 broadcasted_txid.to_string(),
1321 contract.signed_contract.accepted_contract.get_contract_id_string()
1322 );
1323 let pnl = contract
1324 .signed_contract
1325 .accepted_contract
1326 .compute_pnl(&contract.signed_cet);
1327
1328 let closed_contract = ClosedContract {
1329 attestations: contract.attestations.clone(),
1330 signed_cet: Some(contract.signed_cet.clone()),
1331 contract_id: contract.signed_contract.accepted_contract.get_contract_id(),
1332 temporary_contract_id: contract
1333 .signed_contract
1334 .accepted_contract
1335 .offered_contract
1336 .id,
1337 counter_party_id: contract
1338 .signed_contract
1339 .accepted_contract
1340 .offered_contract
1341 .counter_party,
1342 funding_txid: contract
1343 .signed_contract
1344 .accepted_contract
1345 .dlc_transactions
1346 .fund
1347 .compute_txid(),
1348 pnl,
1349 signed_contract: contract.signed_contract.clone(),
1350 };
1351 self.store
1352 .update_contract(&Contract::Closed(closed_contract))
1353 .await?;
1354 }
1355
1356 Ok(())
1357 }
1358
1359 #[tracing::instrument(skip_all, level = "debug")]
1360 async fn close_contract(
1361 &self,
1362 contract: &SignedContract,
1363 signed_cet: Transaction,
1364 attestations: Vec<OracleAttestation>,
1365 ) -> Result<Contract, Error> {
1366 let confirmations = self
1367 .blockchain
1368 .get_transaction_confirmations(&signed_cet.compute_txid())
1369 .await?;
1370
1371 if confirmations < 1 {
1372 log_info!(
1373 self.logger,
1374 "Broadcasting signed CET. txid={} contract_id={}",
1375 signed_cet.compute_txid().to_string(),
1376 contract.accepted_contract.get_contract_id_string()
1377 );
1378 self.blockchain.send_transaction(&signed_cet).await?;
1383
1384 let preclosed_contract = PreClosedContract {
1385 signed_contract: contract.clone(),
1386 attestations: Some(attestations),
1387 signed_cet,
1388 };
1389
1390 return Ok(Contract::PreClosed(preclosed_contract));
1391 } else if confirmations < *NB_CONFIRMATIONS {
1392 log_debug!(self.logger,
1393 "Found a confirmed but not fully confirmed pending close. Moving to preclosed. txid={} contract_id={}",
1394 signed_cet.compute_txid().to_string(),
1395 contract.accepted_contract.get_contract_id_string()
1396 );
1397 let preclosed_contract = PreClosedContract {
1398 signed_contract: contract.clone(),
1399 attestations: Some(attestations),
1400 signed_cet,
1401 };
1402
1403 return Ok(Contract::PreClosed(preclosed_contract));
1404 }
1405
1406 let closed_contract = ClosedContract {
1407 attestations: Some(attestations.to_vec()),
1408 pnl: contract.accepted_contract.compute_pnl(&signed_cet),
1409 signed_cet: Some(signed_cet),
1410 contract_id: contract.accepted_contract.get_contract_id(),
1411 temporary_contract_id: contract.accepted_contract.offered_contract.id,
1412 funding_txid: contract
1413 .accepted_contract
1414 .dlc_transactions
1415 .fund
1416 .compute_txid(),
1417 counter_party_id: contract.accepted_contract.offered_contract.counter_party,
1418 signed_contract: contract.clone(),
1419 };
1420
1421 Ok(Contract::Closed(closed_contract))
1422 }
1423
1424 #[tracing::instrument(skip_all, level = "debug")]
1426 pub async fn check_and_broadcast_refund(
1427 &self,
1428 contract_id: &ContractId,
1429 ) -> Result<Contract, Error> {
1430 let contract = get_contract_in_state!(self, contract_id, Confirmed, None::<PublicKey>)?;
1432
1433 if contract
1434 .accepted_contract
1435 .dlc_transactions
1436 .refund
1437 .lock_time
1438 .to_consensus_u32() as u64
1439 <= self.time.unix_time_now()
1440 {
1441 log_debug!(self.logger,
1442 "Refund locktime has passed. Attempting to broadcast refund. refund_txid={} contract_id={}",
1443 contract.accepted_contract.dlc_transactions.refund.compute_txid().to_string(),
1444 contract.accepted_contract.get_contract_id_string()
1445 );
1446 let accepted_contract = &contract.accepted_contract;
1447 let refund = accepted_contract.dlc_transactions.refund.clone();
1448 let confirmations = self
1449 .blockchain
1450 .get_transaction_confirmations(&refund.compute_txid())
1451 .await?;
1452 if confirmations == 0 {
1453 log_debug!(self.logger,
1454 "Refund transaction has not been broadcast yet. Sending transaction txid={} contract_id={}",
1455 refund.compute_txid().to_string(),
1456 contract.accepted_contract.get_contract_id_string()
1457 );
1458 let offer = &contract.accepted_contract.offered_contract;
1459 let signer = self.signer_provider.derive_contract_signer(offer.keys_id)?;
1460 let refund = crate::contract_updater::get_signed_refund(
1461 &self.secp,
1462 &contract,
1463 &signer,
1464 &self.logger,
1465 )?;
1466 self.blockchain.send_transaction(&refund).await?;
1467 }
1468
1469 let refunded = Contract::Refunded(contract.clone());
1470 self.store.update_contract(&refunded).await?;
1471 Ok(refunded)
1472 } else {
1473 return Err(Error::InvalidParameters(
1474 "Contract maturity has not passed to broadcast refund".to_string(),
1475 ));
1476 }
1477 }
1478
1479 #[tracing::instrument(skip_all, level = "debug")]
1480 async fn check_refund(&self, contract: &SignedContract) -> Result<(), Error> {
1481 if contract
1482 .accepted_contract
1483 .dlc_transactions
1484 .refund
1485 .lock_time
1486 .to_consensus_u32() as u64
1487 <= self.time.unix_time_now()
1488 {
1489 let refund_txid = contract
1490 .accepted_contract
1491 .dlc_transactions
1492 .refund
1493 .compute_txid();
1494 let confirmations = self
1495 .blockchain
1496 .get_transaction_confirmations(&refund_txid)
1497 .await?;
1498
1499 if confirmations > 0 {
1500 self.store
1502 .update_contract(&Contract::Refunded(contract.clone()))
1503 .await?;
1504 } else if *AUTOMATIC_REFUND {
1505 self.check_and_broadcast_refund(&contract.accepted_contract.get_contract_id())
1506 .await?;
1507 }
1508 }
1509
1510 Ok(())
1511 }
1512
1513 #[tracing::instrument(skip_all, level = "debug")]
1516 pub async fn on_counterparty_close(
1517 &mut self,
1518 contract: &SignedContract,
1519 closing_tx: Transaction,
1520 confirmations: u32,
1521 ) -> Result<Contract, Error> {
1522 if !closing_tx.input.iter().any(|i| {
1524 i.previous_output
1525 == contract
1526 .accepted_contract
1527 .dlc_transactions
1528 .get_fund_outpoint()
1529 }) {
1530 log_error!(
1531 self.logger,
1532 "Closing tx does not spend the funding tx. txid={} contract_id={}",
1533 closing_tx.compute_txid().to_string(),
1534 contract.accepted_contract.get_contract_id_string()
1535 );
1536 return Err(Error::InvalidParameters(
1537 "Closing tx does not spend the funding tx".to_string(),
1538 ));
1539 }
1540
1541 if contract
1543 .accepted_contract
1544 .dlc_transactions
1545 .refund
1546 .compute_txid()
1547 == closing_tx.compute_txid()
1548 {
1549 log_debug!(
1550 self.logger,
1551 "Closing tx is the refund tx. Moving to refunded. txid={} contract_id={}",
1552 closing_tx.compute_txid().to_string(),
1553 contract.accepted_contract.get_contract_id_string()
1554 );
1555 let refunded = Contract::Refunded(contract.clone());
1556 self.store.update_contract(&refunded).await?;
1557 return Ok(refunded);
1558 }
1559
1560 let contract = if confirmations < *NB_CONFIRMATIONS {
1561 log_info!(self.logger,
1562 "Closing transaction is not fully confirmed. Moving to preclosed. txid={} contract_id={}",
1563 closing_tx.compute_txid().to_string(),
1564 contract.accepted_contract.get_contract_id_string()
1565 );
1566 Contract::PreClosed(PreClosedContract {
1567 signed_contract: contract.clone(),
1568 attestations: None, signed_cet: closing_tx,
1570 })
1571 } else {
1572 log_info!(
1573 self.logger,
1574 "Closing transaction is fully confirmed. Moving to closed. txid={} contract_id={}",
1575 closing_tx.compute_txid().to_string(),
1576 contract.accepted_contract.get_contract_id_string()
1577 );
1578 Contract::Closed(ClosedContract {
1579 attestations: None, pnl: contract.accepted_contract.compute_pnl(&closing_tx),
1581 signed_cet: Some(closing_tx),
1582 contract_id: contract.accepted_contract.get_contract_id(),
1583 temporary_contract_id: contract.accepted_contract.offered_contract.id,
1584 counter_party_id: contract.accepted_contract.offered_contract.counter_party,
1585 funding_txid: contract
1586 .accepted_contract
1587 .dlc_transactions
1588 .fund
1589 .compute_txid(),
1590 signed_contract: contract.clone(),
1591 })
1592 };
1593
1594 self.store.update_contract(&contract).await?;
1595
1596 Ok(contract)
1597 }
1598
1599 #[tracing::instrument(skip_all, level = "debug")]
1603 pub async fn cooperative_close_contract(
1604 &self,
1605 contract_id: &ContractId,
1606 counter_payout: Amount,
1607 ) -> Result<(CloseDlc, PublicKey), Error> {
1608 let signed_contract =
1609 get_contract_in_state!(self, contract_id, Confirmed, None as Option<PublicKey>)?;
1610
1611 let (close_message, close_tx) = crate::contract_updater::create_cooperative_close(
1612 &self.secp,
1613 &signed_contract,
1614 counter_payout,
1615 &self.signer_provider,
1616 &self.logger,
1617 )?;
1618
1619 let mut updated_dlc_transactions =
1621 signed_contract.accepted_contract.dlc_transactions.clone();
1622 updated_dlc_transactions
1623 .pending_close_txs
1624 .push(close_tx.clone());
1625 log_debug!(
1626 self.logger,
1627 "Created updated contract with pending close transaction. close_txid={} contract_id={}",
1628 close_tx.compute_txid().to_string(),
1629 contract_id.to_lower_hex_string()
1630 );
1631
1632 let updated_accepted_contract = AcceptedContract {
1633 dlc_transactions: updated_dlc_transactions,
1634 ..signed_contract.accepted_contract.clone()
1635 };
1636
1637 let updated_signed_contract = SignedContract {
1638 accepted_contract: updated_accepted_contract,
1639 ..signed_contract.clone()
1640 };
1641
1642 self.store
1644 .update_contract(&Contract::Confirmed(updated_signed_contract))
1645 .await?;
1646
1647 let counter_party = signed_contract
1648 .accepted_contract
1649 .offered_contract
1650 .counter_party;
1651
1652 Ok((close_message, counter_party))
1653 }
1654
1655 #[tracing::instrument(skip_all, level = "debug")]
1658 pub async fn accept_cooperative_close(
1659 &self,
1660 contract_id: &ContractId,
1661 close_message: &CloseDlc,
1662 ) -> Result<(), Error> {
1663 let signed_contract =
1664 get_contract_in_state!(self, contract_id, Confirmed, None as Option<PublicKey>)?;
1665
1666 let close_tx = crate::contract_updater::complete_cooperative_close(
1667 &self.secp,
1668 &signed_contract,
1669 close_message,
1670 &self.signer_provider,
1671 &self.logger,
1672 )?;
1673
1674 log_debug!(
1675 self.logger,
1676 "Completed cooperative close. close_txid={} contract_id={}",
1677 close_tx.compute_txid().to_string(),
1678 contract_id.to_lower_hex_string()
1679 );
1680 self.blockchain.send_transaction(&close_tx).await?;
1682
1683 let preclosed_contract = PreClosedContract {
1685 signed_contract,
1686 attestations: None,
1687 signed_cet: close_tx,
1688 };
1689
1690 self.store
1691 .update_contract(&Contract::PreClosed(preclosed_contract))
1692 .await?;
1693
1694 Ok(())
1695 }
1696}
1697
1698impl<
1699 W: Deref,
1700 SP: Deref,
1701 B: Deref,
1702 S: Deref,
1703 O: Deref,
1704 T: Deref,
1705 F: Deref,
1706 X: ContractSigner,
1707 L: Deref,
1708 > Manager<W, Arc<CachedContractSignerProvider<SP, X>>, B, S, O, T, F, X, L>
1709where
1710 W::Target: Wallet,
1711 SP::Target: ContractSignerProvider<Signer = X>,
1712 B::Target: Blockchain,
1713 S::Target: Storage,
1714 O::Target: Oracle,
1715 T::Target: Time,
1716 F::Target: FeeEstimator,
1717 L::Target: Logger,
1718{
1719 pub async fn offer_channel(
1722 &self,
1723 contract_input: &ContractInput,
1724 counter_party: PublicKey,
1725 ) -> Result<OfferChannel, Error> {
1726 let oracle_announcements = self.oracle_announcements(contract_input).await?;
1727
1728 let (offered_channel, offered_contract) = crate::channel_updater::offer_channel(
1729 &self.secp,
1730 contract_input,
1731 &counter_party,
1732 &oracle_announcements,
1733 CET_NSEQUENCE,
1734 REFUND_DELAY,
1735 &self.wallet,
1736 &self.signer_provider,
1737 &self.blockchain,
1738 &self.time,
1739 crate::utils::get_new_temporary_id(),
1740 )
1741 .await?;
1742
1743 let msg = offered_channel.get_offer_channel_msg(&offered_contract);
1744
1745 self.store
1746 .upsert_channel(
1747 Channel::Offered(offered_channel),
1748 Some(Contract::Offered(offered_contract)),
1749 )
1750 .await?;
1751
1752 Ok(msg)
1753 }
1754
1755 pub async fn reject_channel(
1758 &self,
1759 channel_id: &ChannelId,
1760 ) -> Result<(Reject, PublicKey), Error> {
1761 let offered_channel =
1762 get_channel_in_state!(self, channel_id, Offered, None as Option<PublicKey>)?;
1763
1764 if offered_channel.is_offer_party {
1765 return Err(Error::InvalidState(
1766 "Cannot reject channel initiated by us.".to_string(),
1767 ));
1768 }
1769
1770 let offered_contract = get_contract_in_state!(
1771 self,
1772 &offered_channel.offered_contract_id,
1773 Offered,
1774 None as Option<PublicKey>
1775 )?;
1776
1777 let counterparty = offered_channel.counter_party;
1778 self.store
1779 .upsert_channel(
1780 Channel::Cancelled(offered_channel),
1781 Some(Contract::Rejected(offered_contract)),
1782 )
1783 .await?;
1784
1785 let msg = Reject {
1786 channel_id: *channel_id,
1787 };
1788 Ok((msg, counterparty))
1789 }
1790
1791 pub async fn accept_channel(
1795 &self,
1796 channel_id: &ChannelId,
1797 ) -> Result<(AcceptChannel, ChannelId, ContractId, PublicKey), Error> {
1798 let offered_channel =
1799 get_channel_in_state!(self, channel_id, Offered, None as Option<PublicKey>)?;
1800
1801 if offered_channel.is_offer_party {
1802 return Err(Error::InvalidState(
1803 "Cannot accept channel initiated by us.".to_string(),
1804 ));
1805 }
1806
1807 let offered_contract = get_contract_in_state!(
1808 self,
1809 &offered_channel.offered_contract_id,
1810 Offered,
1811 None as Option<PublicKey>
1812 )?;
1813
1814 let (accepted_channel, accepted_contract, accept_channel) =
1815 crate::channel_updater::accept_channel_offer(
1816 &self.secp,
1817 &offered_channel,
1818 &offered_contract,
1819 &self.wallet,
1820 &self.signer_provider,
1821 &self.blockchain,
1822 )
1823 .await?;
1824
1825 self.wallet.import_address(&Address::p2wsh(
1826 &accepted_contract.dlc_transactions.funding_script_pubkey,
1827 self.blockchain.get_network()?,
1828 ))?;
1829
1830 let channel_id = accepted_channel.channel_id;
1831 let contract_id = accepted_contract.get_contract_id();
1832 let counter_party = accepted_contract.offered_contract.counter_party;
1833
1834 self.store
1835 .upsert_channel(
1836 Channel::Accepted(accepted_channel),
1837 Some(Contract::Accepted(accepted_contract)),
1838 )
1839 .await?;
1840
1841 Ok((accept_channel, channel_id, contract_id, counter_party))
1842 }
1843
1844 pub async fn force_close_channel(&self, channel_id: &ChannelId) -> Result<(), Error> {
1846 let channel = get_channel_in_state!(self, channel_id, Signed, None as Option<PublicKey>)?;
1847
1848 self.force_close_channel_internal(channel, true).await
1849 }
1850
1851 pub async fn settle_offer(
1855 &self,
1856 channel_id: &ChannelId,
1857 counter_payout: Amount,
1858 ) -> Result<(SettleOffer, PublicKey), Error> {
1859 let mut signed_channel =
1860 get_channel_in_state!(self, channel_id, Signed, None as Option<PublicKey>)?;
1861
1862 let msg = crate::channel_updater::settle_channel_offer(
1863 &self.secp,
1864 &mut signed_channel,
1865 counter_payout,
1866 PEER_TIMEOUT,
1867 &self.signer_provider,
1868 &self.time,
1869 )?;
1870
1871 let counter_party = signed_channel.counter_party;
1872
1873 self.store
1874 .upsert_channel(Channel::Signed(signed_channel), None)
1875 .await?;
1876
1877 Ok((msg, counter_party))
1878 }
1879
1880 pub async fn accept_settle_offer(
1883 &self,
1884 channel_id: &ChannelId,
1885 ) -> Result<(SettleAccept, PublicKey), Error> {
1886 let mut signed_channel =
1887 get_channel_in_state!(self, channel_id, Signed, None as Option<PublicKey>)?;
1888
1889 let msg = crate::channel_updater::settle_channel_accept(
1890 &self.secp,
1891 &mut signed_channel,
1892 CET_NSEQUENCE,
1893 0,
1894 PEER_TIMEOUT,
1895 &self.signer_provider,
1896 &self.time,
1897 &self.chain_monitor,
1898 )
1899 .await?;
1900
1901 let counter_party = signed_channel.counter_party;
1902
1903 self.store
1904 .upsert_channel(Channel::Signed(signed_channel), None)
1905 .await?;
1906
1907 Ok((msg, counter_party))
1908 }
1909
1910 pub async fn renew_offer(
1914 &self,
1915 channel_id: &ChannelId,
1916 counter_payout: Amount,
1917 contract_input: &ContractInput,
1918 ) -> Result<(RenewOffer, PublicKey), Error> {
1919 let mut signed_channel =
1920 get_channel_in_state!(self, channel_id, Signed, None as Option<PublicKey>)?;
1921
1922 let oracle_announcements = self.oracle_announcements(contract_input).await?;
1923
1924 let (msg, offered_contract) = crate::channel_updater::renew_offer(
1925 &self.secp,
1926 &mut signed_channel,
1927 contract_input,
1928 oracle_announcements,
1929 counter_payout,
1930 REFUND_DELAY,
1931 PEER_TIMEOUT,
1932 CET_NSEQUENCE,
1933 &self.signer_provider,
1934 &self.time,
1935 )?;
1936
1937 let counter_party = offered_contract.counter_party;
1938
1939 self.store
1940 .upsert_channel(
1941 Channel::Signed(signed_channel),
1942 Some(Contract::Offered(offered_contract)),
1943 )
1944 .await?;
1945
1946 Ok((msg, counter_party))
1947 }
1948
1949 pub async fn accept_renew_offer(
1953 &self,
1954 channel_id: &ChannelId,
1955 ) -> Result<(RenewAccept, PublicKey), Error> {
1956 let mut signed_channel =
1957 get_channel_in_state!(self, channel_id, Signed, None as Option<PublicKey>)?;
1958 let offered_contract_id = signed_channel.get_contract_id().ok_or_else(|| {
1959 Error::InvalidState("Expected to have a contract id but did not.".to_string())
1960 })?;
1961
1962 let offered_contract = get_contract_in_state!(
1963 self,
1964 &offered_contract_id,
1965 Offered,
1966 None as Option<PublicKey>
1967 )?;
1968
1969 let (accepted_contract, msg) = crate::channel_updater::accept_channel_renewal(
1970 &self.secp,
1971 &mut signed_channel,
1972 &offered_contract,
1973 CET_NSEQUENCE,
1974 PEER_TIMEOUT,
1975 &self.signer_provider,
1976 &self.time,
1977 )?;
1978
1979 let counter_party = signed_channel.counter_party;
1980
1981 self.store
1982 .upsert_channel(
1983 Channel::Signed(signed_channel),
1984 Some(Contract::Accepted(accepted_contract)),
1985 )
1986 .await?;
1987
1988 Ok((msg, counter_party))
1989 }
1990
1991 pub async fn reject_renew_offer(
1995 &self,
1996 channel_id: &ChannelId,
1997 ) -> Result<(Reject, PublicKey), Error> {
1998 let mut signed_channel =
1999 get_channel_in_state!(self, channel_id, Signed, None as Option<PublicKey>)?;
2000 let offered_contract_id = signed_channel.get_contract_id().ok_or_else(|| {
2001 Error::InvalidState(
2002 "Expected to be in a state with an associated contract id but was not.".to_string(),
2003 )
2004 })?;
2005
2006 let offered_contract = get_contract_in_state!(
2007 self,
2008 &offered_contract_id,
2009 Offered,
2010 None as Option<PublicKey>
2011 )?;
2012
2013 let reject_msg = crate::channel_updater::reject_renew_offer(&mut signed_channel)?;
2014
2015 let counter_party = signed_channel.counter_party;
2016
2017 self.store
2018 .upsert_channel(
2019 Channel::Signed(signed_channel),
2020 Some(Contract::Rejected(offered_contract)),
2021 )
2022 .await?;
2023
2024 Ok((reject_msg, counter_party))
2025 }
2026
2027 pub async fn reject_settle_offer(
2031 &self,
2032 channel_id: &ChannelId,
2033 ) -> Result<(Reject, PublicKey), Error> {
2034 let mut signed_channel =
2035 get_channel_in_state!(self, channel_id, Signed, None as Option<PublicKey>)?;
2036
2037 let msg = crate::channel_updater::reject_settle_offer(&mut signed_channel)?;
2038
2039 let counter_party = signed_channel.counter_party;
2040
2041 self.store
2042 .upsert_channel(Channel::Signed(signed_channel), None)
2043 .await?;
2044
2045 Ok((msg, counter_party))
2046 }
2047
2048 pub async fn offer_collaborative_close(
2053 &self,
2054 channel_id: &ChannelId,
2055 counter_payout: Amount,
2056 additional_inputs: Vec<OutPoint>,
2057 ) -> Result<CollaborativeCloseOffer, Error> {
2058 let mut signed_channel =
2059 get_channel_in_state!(self, channel_id, Signed, None as Option<PublicKey>)?;
2060
2061 let (msg, close_tx) = crate::channel_updater::offer_collaborative_close(
2062 &self.secp,
2063 &mut signed_channel,
2064 counter_payout,
2065 additional_inputs,
2066 &self.signer_provider,
2067 &self.time,
2068 )?;
2069
2070 self.chain_monitor.lock().await.add_tx(
2071 close_tx.compute_txid(),
2072 ChannelInfo {
2073 channel_id: *channel_id,
2074 tx_type: TxType::CollaborativeClose,
2075 },
2076 );
2077
2078 self.store
2079 .upsert_channel(Channel::Signed(signed_channel), None)
2080 .await?;
2081 self.store
2082 .persist_chain_monitor(&*self.chain_monitor.lock().await)
2083 .await?;
2084
2085 Ok(msg)
2086 }
2087
2088 pub async fn accept_collaborative_close(&self, channel_id: &ChannelId) -> Result<(), Error> {
2091 let signed_channel =
2092 get_channel_in_state!(self, channel_id, Signed, None as Option<PublicKey>)?;
2093
2094 let closed_contract = if let Some(SignedChannelState::Established {
2095 signed_contract_id,
2096 ..
2097 }) = &signed_channel.roll_back_state
2098 {
2099 let counter_payout = get_signed_channel_state!(
2100 signed_channel,
2101 CollaborativeCloseOffered,
2102 counter_payout
2103 )?;
2104 Some(
2105 self.get_collaboratively_closed_contract(signed_contract_id, *counter_payout, true)
2106 .await?,
2107 )
2108 } else {
2109 None
2110 };
2111
2112 let (close_tx, closed_channel) = crate::channel_updater::accept_collaborative_close_offer(
2113 &self.secp,
2114 &signed_channel,
2115 &self.signer_provider,
2116 )?;
2117
2118 self.blockchain.send_transaction(&close_tx).await?;
2119
2120 self.store.upsert_channel(closed_channel, None).await?;
2121
2122 if let Some(closed_contract) = closed_contract {
2123 self.store
2124 .update_contract(&Contract::Closed(closed_contract))
2125 .await?;
2126 }
2127
2128 Ok(())
2129 }
2130
2131 async fn try_finalize_closing_established_channel(
2132 &self,
2133 signed_channel: SignedChannel,
2134 ) -> Result<(), Error> {
2135 let (buffer_tx, contract_id, &is_initiator) = get_signed_channel_state!(
2136 signed_channel,
2137 Closing,
2138 buffer_transaction,
2139 contract_id,
2140 is_initiator
2141 )?;
2142
2143 if self
2144 .blockchain
2145 .get_transaction_confirmations(&buffer_tx.compute_txid())
2146 .await?
2147 >= CET_NSEQUENCE
2148 {
2149 log_info!(
2150 self.logger,
2151 "Buffer transaction for contract {} has enough confirmations to spend from it",
2152 serialize_hex(&contract_id)
2153 );
2154
2155 let confirmed_contract =
2156 get_contract_in_state!(self, &contract_id, Confirmed, None as Option<PublicKey>)?;
2157
2158 let (contract_info, adaptor_info, attestations) = self
2159 .get_closable_contract_info(&confirmed_contract)
2160 .await
2161 .ok_or_else(|| {
2162 Error::InvalidState("Could not get information to close contract".to_string())
2163 })?;
2164
2165 let (signed_cet, closed_channel) =
2166 crate::channel_updater::finalize_unilateral_close_settled_channel(
2167 &self.secp,
2168 &signed_channel,
2169 &confirmed_contract,
2170 contract_info,
2171 &attestations,
2172 adaptor_info,
2173 &self.signer_provider,
2174 is_initiator,
2175 )?;
2176
2177 let closed_contract = self
2178 .close_contract(
2179 &confirmed_contract,
2180 signed_cet,
2181 attestations.iter().map(|x| &x.1).cloned().collect(),
2182 )
2183 .await?;
2184
2185 self.chain_monitor
2186 .lock()
2187 .await
2188 .cleanup_channel(signed_channel.channel_id);
2189
2190 self.store
2191 .upsert_channel(closed_channel, Some(closed_contract))
2192 .await?;
2193 }
2194
2195 Ok(())
2196 }
2197
2198 async fn on_offer_channel(
2199 &self,
2200 offer_channel: &OfferChannel,
2201 counter_party: PublicKey,
2202 ) -> Result<(), Error> {
2203 offer_channel.validate(
2204 &self.secp,
2205 REFUND_DELAY,
2206 REFUND_DELAY * 2,
2207 CET_NSEQUENCE,
2208 CET_NSEQUENCE * 2,
2209 )?;
2210
2211 let keys_id = self
2212 .signer_provider
2213 .derive_signer_key_id(false, offer_channel.temporary_contract_id);
2214 let (channel, contract) =
2215 OfferedChannel::from_offer_channel(offer_channel, counter_party, keys_id)?;
2216
2217 contract.validate()?;
2218
2219 if self
2220 .store
2221 .get_channel(&channel.temporary_channel_id)
2222 .await?
2223 .is_some()
2224 {
2225 return Err(Error::InvalidParameters(
2226 "Channel with identical idea already in store".to_string(),
2227 ));
2228 }
2229
2230 self.store
2231 .upsert_channel(Channel::Offered(channel), Some(Contract::Offered(contract)))
2232 .await?;
2233
2234 Ok(())
2235 }
2236
2237 async fn on_accept_channel(
2238 &self,
2239 accept_channel: &AcceptChannel,
2240 peer_id: &PublicKey,
2241 ) -> Result<SignChannel, Error> {
2242 let offered_channel = get_channel_in_state!(
2243 self,
2244 &accept_channel.temporary_channel_id,
2245 Offered,
2246 Some(*peer_id)
2247 )?;
2248 let offered_contract = get_contract_in_state!(
2249 self,
2250 &offered_channel.offered_contract_id,
2251 Offered,
2252 Some(*peer_id)
2253 )?;
2254
2255 let (signed_channel, signed_contract, sign_channel) = {
2256 let res = crate::channel_updater::verify_and_sign_accepted_channel(
2257 &self.secp,
2258 &offered_channel,
2259 &offered_contract,
2260 accept_channel,
2261 CET_NSEQUENCE,
2263 &self.wallet,
2264 &self.signer_provider,
2265 &self.store,
2266 &self.chain_monitor,
2267 &self.logger,
2268 )
2269 .await;
2270
2271 match res {
2272 Ok(res) => res,
2273 Err(e) => {
2274 let channel = crate::channel::FailedAccept {
2275 temporary_channel_id: accept_channel.temporary_channel_id,
2276 error_message: format!("Error validating accept channel: {e}"),
2277 accept_message: accept_channel.clone(),
2278 counter_party: *peer_id,
2279 };
2280 self.store
2281 .upsert_channel(Channel::FailedAccept(channel), None)
2282 .await?;
2283 return Err(e);
2284 }
2285 }
2286 };
2287
2288 self.wallet.import_address(&Address::p2wsh(
2289 &signed_contract
2290 .accepted_contract
2291 .dlc_transactions
2292 .funding_script_pubkey,
2293 self.blockchain.get_network()?,
2294 ))?;
2295
2296 if let SignedChannelState::Established {
2297 buffer_transaction, ..
2298 } = &signed_channel.state
2299 {
2300 self.chain_monitor.lock().await.add_tx(
2301 buffer_transaction.compute_txid(),
2302 ChannelInfo {
2303 channel_id: signed_channel.channel_id,
2304 tx_type: TxType::BufferTx,
2305 },
2306 );
2307 } else {
2308 unreachable!();
2309 }
2310
2311 self.store
2312 .upsert_channel(
2313 Channel::Signed(signed_channel),
2314 Some(Contract::Signed(signed_contract)),
2315 )
2316 .await?;
2317
2318 self.store
2319 .persist_chain_monitor(&*self.chain_monitor.lock().await)
2320 .await?;
2321
2322 Ok(sign_channel)
2323 }
2324
2325 async fn on_sign_channel(
2326 &self,
2327 sign_channel: &SignChannel,
2328 peer_id: &PublicKey,
2329 ) -> Result<(), Error> {
2330 let accepted_channel =
2331 get_channel_in_state!(self, &sign_channel.channel_id, Accepted, Some(*peer_id))?;
2332 let accepted_contract = get_contract_in_state!(
2333 self,
2334 &accepted_channel.accepted_contract_id,
2335 Accepted,
2336 Some(*peer_id)
2337 )?;
2338
2339 let (signed_channel, signed_contract, signed_fund_tx) = {
2340 let res = verify_signed_channel(
2341 &self.secp,
2342 &accepted_channel,
2343 &accepted_contract,
2344 sign_channel,
2345 &self.wallet,
2346 &self.chain_monitor,
2347 &self.store,
2348 &self.signer_provider,
2349 &self.logger,
2350 )
2351 .await;
2352
2353 match res {
2354 Ok(res) => res,
2355 Err(e) => {
2356 let channel = crate::channel::FailedSign {
2357 channel_id: sign_channel.channel_id,
2358 error_message: format!("Error validating accept channel: {e}"),
2359 sign_message: sign_channel.clone(),
2360 counter_party: *peer_id,
2361 };
2362 self.store
2363 .upsert_channel(Channel::FailedSign(channel), None)
2364 .await?;
2365 return Err(e);
2366 }
2367 }
2368 };
2369
2370 if let SignedChannelState::Established {
2371 buffer_transaction, ..
2372 } = &signed_channel.state
2373 {
2374 self.chain_monitor.lock().await.add_tx(
2375 buffer_transaction.compute_txid(),
2376 ChannelInfo {
2377 channel_id: signed_channel.channel_id,
2378 tx_type: TxType::BufferTx,
2379 },
2380 );
2381 } else {
2382 unreachable!();
2383 }
2384
2385 self.blockchain.send_transaction(&signed_fund_tx).await?;
2386
2387 self.store
2388 .upsert_channel(
2389 Channel::Signed(signed_channel),
2390 Some(Contract::Signed(signed_contract)),
2391 )
2392 .await?;
2393 self.store
2394 .persist_chain_monitor(&*self.chain_monitor.lock().await)
2395 .await?;
2396
2397 Ok(())
2398 }
2399
2400 async fn on_settle_offer(
2401 &self,
2402 settle_offer: &SettleOffer,
2403 peer_id: &PublicKey,
2404 ) -> Result<Option<Reject>, Error> {
2405 let mut signed_channel =
2406 get_channel_in_state!(self, &settle_offer.channel_id, Signed, Some(*peer_id))?;
2407
2408 if let SignedChannelState::SettledOffered { .. } = signed_channel.state {
2409 return Ok(Some(Reject {
2410 channel_id: settle_offer.channel_id,
2411 }));
2412 }
2413
2414 crate::channel_updater::on_settle_offer(&mut signed_channel, settle_offer)?;
2415
2416 self.store
2417 .upsert_channel(Channel::Signed(signed_channel), None)
2418 .await?;
2419
2420 Ok(None)
2421 }
2422
2423 async fn on_settle_accept(
2424 &self,
2425 settle_accept: &SettleAccept,
2426 peer_id: &PublicKey,
2427 ) -> Result<SettleConfirm, Error> {
2428 let mut signed_channel =
2429 get_channel_in_state!(self, &settle_accept.channel_id, Signed, Some(*peer_id))?;
2430
2431 let msg = crate::channel_updater::settle_channel_confirm(
2432 &self.secp,
2433 &mut signed_channel,
2434 settle_accept,
2435 CET_NSEQUENCE,
2436 0,
2437 PEER_TIMEOUT,
2438 &self.signer_provider,
2439 &self.time,
2440 &self.chain_monitor,
2441 )
2442 .await?;
2443
2444 self.store
2445 .upsert_channel(Channel::Signed(signed_channel), None)
2446 .await?;
2447
2448 Ok(msg)
2449 }
2450
2451 async fn on_settle_confirm(
2452 &self,
2453 settle_confirm: &SettleConfirm,
2454 peer_id: &PublicKey,
2455 ) -> Result<SettleFinalize, Error> {
2456 let mut signed_channel =
2457 get_channel_in_state!(self, &settle_confirm.channel_id, Signed, Some(*peer_id))?;
2458 let &own_payout = get_signed_channel_state!(signed_channel, SettledAccepted, own_payout)?;
2459 let (prev_buffer_tx, own_buffer_adaptor_signature, is_offer, signed_contract_id) = get_signed_channel_rollback_state!(
2460 signed_channel,
2461 Established,
2462 buffer_transaction,
2463 own_buffer_adaptor_signature,
2464 is_offer,
2465 signed_contract_id
2466 )?;
2467
2468 let prev_buffer_txid = prev_buffer_tx.compute_txid();
2469 let own_buffer_adaptor_signature = *own_buffer_adaptor_signature;
2470 let is_offer = *is_offer;
2471 let signed_contract_id = *signed_contract_id;
2472
2473 let msg = crate::channel_updater::settle_channel_finalize(
2474 &self.secp,
2475 &mut signed_channel,
2476 settle_confirm,
2477 &self.signer_provider,
2478 )?;
2479
2480 self.chain_monitor.lock().await.add_tx(
2481 prev_buffer_txid,
2482 ChannelInfo {
2483 channel_id: signed_channel.channel_id,
2484 tx_type: TxType::Revoked {
2485 update_idx: signed_channel.update_idx + 1,
2486 own_adaptor_signature: own_buffer_adaptor_signature,
2487 is_offer,
2488 revoked_tx_type: RevokedTxType::Buffer,
2489 },
2490 },
2491 );
2492
2493 let closed_contract = Contract::Closed(
2494 self.get_collaboratively_closed_contract(&signed_contract_id, own_payout, true)
2495 .await?,
2496 );
2497
2498 self.store
2499 .upsert_channel(Channel::Signed(signed_channel), Some(closed_contract))
2500 .await?;
2501 self.store
2502 .persist_chain_monitor(&*self.chain_monitor.lock().await)
2503 .await?;
2504
2505 Ok(msg)
2506 }
2507
2508 async fn on_settle_finalize(
2509 &self,
2510 settle_finalize: &SettleFinalize,
2511 peer_id: &PublicKey,
2512 ) -> Result<(), Error> {
2513 let mut signed_channel =
2514 get_channel_in_state!(self, &settle_finalize.channel_id, Signed, Some(*peer_id))?;
2515 let &own_payout = get_signed_channel_state!(signed_channel, SettledConfirmed, own_payout)?;
2516 let (buffer_tx, own_buffer_adaptor_signature, is_offer, signed_contract_id) = get_signed_channel_rollback_state!(
2517 signed_channel,
2518 Established,
2519 buffer_transaction,
2520 own_buffer_adaptor_signature,
2521 is_offer,
2522 signed_contract_id
2523 )?;
2524
2525 let own_buffer_adaptor_signature = *own_buffer_adaptor_signature;
2526 let is_offer = *is_offer;
2527 let buffer_txid = buffer_tx.compute_txid();
2528 let signed_contract_id = *signed_contract_id;
2529
2530 crate::channel_updater::settle_channel_on_finalize(
2531 &self.secp,
2532 &mut signed_channel,
2533 settle_finalize,
2534 )?;
2535
2536 self.chain_monitor.lock().await.add_tx(
2537 buffer_txid,
2538 ChannelInfo {
2539 channel_id: signed_channel.channel_id,
2540 tx_type: TxType::Revoked {
2541 update_idx: signed_channel.update_idx + 1,
2542 own_adaptor_signature: own_buffer_adaptor_signature,
2543 is_offer,
2544 revoked_tx_type: RevokedTxType::Buffer,
2545 },
2546 },
2547 );
2548
2549 let closed_contract = Contract::Closed(
2550 self.get_collaboratively_closed_contract(&signed_contract_id, own_payout, true)
2551 .await?,
2552 );
2553 self.store
2554 .upsert_channel(Channel::Signed(signed_channel), Some(closed_contract))
2555 .await?;
2556 self.store
2557 .persist_chain_monitor(&*self.chain_monitor.lock().await)
2558 .await?;
2559
2560 Ok(())
2561 }
2562
2563 async fn on_renew_offer(
2564 &self,
2565 renew_offer: &RenewOffer,
2566 peer_id: &PublicKey,
2567 ) -> Result<Option<Reject>, Error> {
2568 let mut signed_channel =
2569 get_channel_in_state!(self, &renew_offer.channel_id, Signed, Some(*peer_id))?;
2570
2571 if let SignedChannelState::RenewOffered { is_offer, .. } = signed_channel.state {
2573 if is_offer {
2574 return Ok(Some(Reject {
2575 channel_id: renew_offer.channel_id,
2576 }));
2577 }
2578 }
2579
2580 let offered_contract = crate::channel_updater::on_renew_offer(
2581 &mut signed_channel,
2582 renew_offer,
2583 PEER_TIMEOUT,
2584 &self.time,
2585 )?;
2586
2587 self.store.create_contract(&offered_contract).await?;
2588 self.store
2589 .upsert_channel(Channel::Signed(signed_channel), None)
2590 .await?;
2591
2592 Ok(None)
2593 }
2594
2595 async fn on_renew_accept(
2596 &self,
2597 renew_accept: &RenewAccept,
2598 peer_id: &PublicKey,
2599 ) -> Result<RenewConfirm, Error> {
2600 let mut signed_channel =
2601 get_channel_in_state!(self, &renew_accept.channel_id, Signed, Some(*peer_id))?;
2602 let offered_contract_id = signed_channel.get_contract_id().ok_or_else(|| {
2603 Error::InvalidState(
2604 "Expected to be in a state with an associated contract id but was not.".to_string(),
2605 )
2606 })?;
2607
2608 let offered_contract =
2609 get_contract_in_state!(self, &offered_contract_id, Offered, Some(*peer_id))?;
2610
2611 let (signed_contract, msg) = crate::channel_updater::verify_renew_accept_and_confirm(
2612 &self.secp,
2613 renew_accept,
2614 &mut signed_channel,
2615 &offered_contract,
2616 CET_NSEQUENCE,
2617 PEER_TIMEOUT,
2618 &self.wallet,
2619 &self.signer_provider,
2620 &self.time,
2621 &self.store,
2622 &self.logger,
2623 )
2624 .await?;
2625
2626 self.store
2628 .upsert_channel(
2629 Channel::Signed(signed_channel),
2630 Some(Contract::Confirmed(signed_contract)),
2631 )
2632 .await?;
2633
2634 Ok(msg)
2635 }
2636
2637 async fn on_renew_confirm(
2638 &self,
2639 renew_confirm: &RenewConfirm,
2640 peer_id: &PublicKey,
2641 ) -> Result<RenewFinalize, Error> {
2642 let mut signed_channel =
2643 get_channel_in_state!(self, &renew_confirm.channel_id, Signed, Some(*peer_id))?;
2644 let own_payout = get_signed_channel_state!(signed_channel, RenewAccepted, own_payout)?;
2645 let contract_id = signed_channel.get_contract_id().ok_or_else(|| {
2646 Error::InvalidState(
2647 "Expected to be in a state with an associated contract id but was not.".to_string(),
2648 )
2649 })?;
2650
2651 let (tx_type, prev_tx_id, closed_contract) = match signed_channel
2652 .roll_back_state
2653 .as_ref()
2654 .expect("to have a rollback state")
2655 {
2656 SignedChannelState::Established {
2657 own_buffer_adaptor_signature,
2658 buffer_transaction,
2659 signed_contract_id,
2660 ..
2661 } => {
2662 let closed_contract = Contract::Closed(
2663 self.get_collaboratively_closed_contract(signed_contract_id, *own_payout, true)
2664 .await?,
2665 );
2666 (
2667 TxType::Revoked {
2668 update_idx: signed_channel.update_idx,
2669 own_adaptor_signature: *own_buffer_adaptor_signature,
2670 is_offer: false,
2671 revoked_tx_type: RevokedTxType::Buffer,
2672 },
2673 buffer_transaction.compute_txid(),
2674 Some(closed_contract),
2675 )
2676 }
2677 SignedChannelState::Settled {
2678 settle_tx,
2679 own_settle_adaptor_signature,
2680 ..
2681 } => (
2682 TxType::Revoked {
2683 update_idx: signed_channel.update_idx,
2684 own_adaptor_signature: *own_settle_adaptor_signature,
2685 is_offer: false,
2686 revoked_tx_type: RevokedTxType::Settle,
2687 },
2688 settle_tx.compute_txid(),
2689 None,
2690 ),
2691 s => {
2692 return Err(Error::InvalidState(format!(
2693 "Expected rollback state Established or Revoked but found {s:?}"
2694 )))
2695 }
2696 };
2697 let accepted_contract =
2698 get_contract_in_state!(self, &contract_id, Accepted, Some(*peer_id))?;
2699
2700 let (signed_contract, msg) = crate::channel_updater::verify_renew_confirm_and_finalize(
2701 &self.secp,
2702 &mut signed_channel,
2703 &accepted_contract,
2704 renew_confirm,
2705 PEER_TIMEOUT,
2706 &self.time,
2707 &self.wallet,
2708 &self.signer_provider,
2709 &self.chain_monitor,
2710 &self.store,
2711 &self.logger,
2712 )
2713 .await?;
2714
2715 self.chain_monitor.lock().await.add_tx(
2716 prev_tx_id,
2717 ChannelInfo {
2718 channel_id: signed_channel.channel_id,
2719 tx_type,
2720 },
2721 );
2722
2723 self.store
2725 .upsert_channel(
2726 Channel::Signed(signed_channel),
2727 Some(Contract::Confirmed(signed_contract)),
2728 )
2729 .await?;
2730
2731 self.store
2732 .persist_chain_monitor(&*self.chain_monitor.lock().await)
2733 .await?;
2734
2735 if let Some(closed_contract) = closed_contract {
2736 self.store.update_contract(&closed_contract).await?;
2737 }
2738
2739 Ok(msg)
2740 }
2741
2742 async fn on_renew_finalize(
2743 &self,
2744 renew_finalize: &RenewFinalize,
2745 peer_id: &PublicKey,
2746 ) -> Result<RenewRevoke, Error> {
2747 let mut signed_channel =
2748 get_channel_in_state!(self, &renew_finalize.channel_id, Signed, Some(*peer_id))?;
2749 let own_payout = get_signed_channel_state!(signed_channel, RenewConfirmed, own_payout)?;
2750
2751 let (tx_type, prev_tx_id, closed_contract) = match signed_channel
2752 .roll_back_state
2753 .as_ref()
2754 .expect("to have a rollback state")
2755 {
2756 SignedChannelState::Established {
2757 own_buffer_adaptor_signature,
2758 buffer_transaction,
2759 signed_contract_id,
2760 ..
2761 } => {
2762 let closed_contract = self
2763 .get_collaboratively_closed_contract(signed_contract_id, *own_payout, true)
2764 .await?;
2765 (
2766 TxType::Revoked {
2767 update_idx: signed_channel.update_idx,
2768 own_adaptor_signature: *own_buffer_adaptor_signature,
2769 is_offer: false,
2770 revoked_tx_type: RevokedTxType::Buffer,
2771 },
2772 buffer_transaction.compute_txid(),
2773 Some(Contract::Closed(closed_contract)),
2774 )
2775 }
2776 SignedChannelState::Settled {
2777 settle_tx,
2778 own_settle_adaptor_signature,
2779 ..
2780 } => (
2781 TxType::Revoked {
2782 update_idx: signed_channel.update_idx,
2783 own_adaptor_signature: *own_settle_adaptor_signature,
2784 is_offer: false,
2785 revoked_tx_type: RevokedTxType::Settle,
2786 },
2787 settle_tx.compute_txid(),
2788 None,
2789 ),
2790 s => {
2791 return Err(Error::InvalidState(format!(
2792 "Expected rollback state of Established or Settled but was {s:?}"
2793 )))
2794 }
2795 };
2796
2797 let msg = crate::channel_updater::renew_channel_on_finalize(
2798 &self.secp,
2799 &mut signed_channel,
2800 renew_finalize,
2801 &self.signer_provider,
2802 )?;
2803
2804 self.chain_monitor.lock().await.add_tx(
2805 prev_tx_id,
2806 ChannelInfo {
2807 channel_id: signed_channel.channel_id,
2808 tx_type,
2809 },
2810 );
2811
2812 let buffer_tx =
2813 get_signed_channel_state!(signed_channel, Established, ref buffer_transaction)?;
2814
2815 self.chain_monitor.lock().await.add_tx(
2816 buffer_tx.compute_txid(),
2817 ChannelInfo {
2818 channel_id: signed_channel.channel_id,
2819 tx_type: TxType::BufferTx,
2820 },
2821 );
2822
2823 self.store
2824 .upsert_channel(Channel::Signed(signed_channel), None)
2825 .await?;
2826 self.store
2827 .persist_chain_monitor(&*self.chain_monitor.lock().await)
2828 .await?;
2829
2830 if let Some(closed_contract) = closed_contract {
2831 self.store.update_contract(&closed_contract).await?;
2832 }
2833
2834 Ok(msg)
2835 }
2836
2837 async fn on_renew_revoke(
2838 &self,
2839 renew_revoke: &RenewRevoke,
2840 peer_id: &PublicKey,
2841 ) -> Result<(), Error> {
2842 let mut signed_channel =
2843 get_channel_in_state!(self, &renew_revoke.channel_id, Signed, Some(*peer_id))?;
2844
2845 crate::channel_updater::renew_channel_on_revoke(
2846 &self.secp,
2847 &mut signed_channel,
2848 renew_revoke,
2849 )?;
2850
2851 self.store
2852 .upsert_channel(Channel::Signed(signed_channel), None)
2853 .await?;
2854
2855 Ok(())
2856 }
2857
2858 async fn on_collaborative_close_offer(
2859 &self,
2860 close_offer: &CollaborativeCloseOffer,
2861 peer_id: &PublicKey,
2862 ) -> Result<(), Error> {
2863 let mut signed_channel =
2864 get_channel_in_state!(self, &close_offer.channel_id, Signed, Some(*peer_id))?;
2865
2866 crate::channel_updater::on_collaborative_close_offer(
2867 &mut signed_channel,
2868 close_offer,
2869 PEER_TIMEOUT,
2870 &self.time,
2871 )?;
2872
2873 self.store
2874 .upsert_channel(Channel::Signed(signed_channel), None)
2875 .await?;
2876
2877 Ok(())
2878 }
2879
2880 async fn on_reject(&self, reject: &Reject, counter_party: &PublicKey) -> Result<(), Error> {
2881 let channel = self.store.get_channel(&reject.channel_id).await?;
2882
2883 if let Some(channel) = channel {
2884 if channel.get_counter_party_id() != *counter_party {
2885 return Err(Error::InvalidParameters(format!(
2886 "Peer {:02x?} is not involved with {} {:02x?}.",
2887 counter_party,
2888 stringify!(Channel),
2889 channel.get_id()
2890 )));
2891 }
2892 match channel {
2893 Channel::Offered(offered_channel) => {
2894 let offered_contract = get_contract_in_state!(
2895 self,
2896 &offered_channel.offered_contract_id,
2897 Offered,
2898 None as Option<PublicKey>
2899 )?;
2900 let utxos = offered_contract
2901 .funding_inputs
2902 .iter()
2903 .map(|funding_input| {
2904 let txid = Transaction::consensus_decode(
2905 &mut funding_input.prev_tx.as_slice(),
2906 )
2907 .expect("Transaction Decode Error")
2908 .compute_txid();
2909 let vout = funding_input.prev_tx_vout;
2910 OutPoint { txid, vout }
2911 })
2912 .collect::<Vec<_>>();
2913
2914 self.wallet.unreserve_utxos(&utxos)?;
2915
2916 self.store
2918 .upsert_channel(
2919 Channel::Cancelled(offered_channel),
2920 Some(Contract::Rejected(offered_contract)),
2921 )
2922 .await?;
2923 }
2924 Channel::Signed(mut signed_channel) => {
2925 let contract = match signed_channel.state {
2926 SignedChannelState::RenewOffered {
2927 offered_contract_id,
2928 ..
2929 } => {
2930 let offered_contract = get_contract_in_state!(
2931 self,
2932 &offered_contract_id,
2933 Offered,
2934 None::<PublicKey>
2935 )?;
2936 Some(Contract::Rejected(offered_contract))
2937 }
2938 _ => None,
2939 };
2940
2941 crate::channel_updater::on_reject(&mut signed_channel)?;
2942
2943 self.store
2944 .upsert_channel(Channel::Signed(signed_channel), contract)
2945 .await?;
2946 }
2947 channel => {
2948 return Err(Error::InvalidState(format!(
2949 "Not in a state adequate to receive a reject message. {channel:?}"
2950 )))
2951 }
2952 }
2953 } else {
2954 log_warn!(
2955 self.logger,
2956 "Couldn't find rejected dlc channel with id: {}",
2957 reject.channel_id.to_lower_hex_string()
2958 );
2959 }
2960
2961 Ok(())
2962 }
2963
2964 async fn channel_checks(&self) -> Result<(), Error> {
2965 let established_closing_channels = self
2966 .store
2967 .get_signed_channels(Some(SignedChannelStateType::Closing))
2968 .await?;
2969
2970 for channel in established_closing_channels {
2971 if let Err(e) = self.try_finalize_closing_established_channel(channel).await {
2972 log_error!(
2973 self.logger,
2974 "Error trying to close established channel: {}",
2975 e
2976 );
2977 }
2978 }
2979
2980 if let Err(e) = self.check_for_timed_out_channels().await {
2981 log_error!(self.logger, "Error checking timed out channels {}", e);
2982 }
2983 self.check_for_watched_tx().await
2984 }
2985
2986 async fn check_for_timed_out_channels(&self) -> Result<(), Error> {
2987 check_for_timed_out_channels!(self, RenewOffered);
2988 check_for_timed_out_channels!(self, RenewAccepted);
2989 check_for_timed_out_channels!(self, RenewConfirmed);
2990 check_for_timed_out_channels!(self, SettledOffered);
2991 check_for_timed_out_channels!(self, SettledAccepted);
2992 check_for_timed_out_channels!(self, SettledConfirmed);
2993
2994 Ok(())
2995 }
2996
2997 pub(crate) async fn process_watched_txs(
2998 &self,
2999 watched_txs: Vec<(Transaction, ChannelInfo)>,
3000 ) -> Result<(), Error> {
3001 for (tx, channel_info) in watched_txs {
3002 let mut signed_channel = match get_channel_in_state!(
3003 self,
3004 &channel_info.channel_id,
3005 Signed,
3006 None as Option<PublicKey>
3007 ) {
3008 Ok(c) => c,
3009 Err(e) => {
3010 log_error!(
3011 self.logger,
3012 "Could not retrieve channel {:?}: {}",
3013 channel_info.channel_id,
3014 e
3015 );
3016 continue;
3017 }
3018 };
3019
3020 let persist = match channel_info.tx_type {
3021 TxType::BufferTx => {
3022 let contract_id = signed_channel
3029 .get_contract_id()
3030 .expect("to have a contract id");
3031 let mut state = SignedChannelState::Closing {
3032 buffer_transaction: tx.clone(),
3033 is_initiator: false,
3034 contract_id,
3035 keys_id: signed_channel.keys_id().ok_or_else(|| {
3036 Error::InvalidState("Expected to have keys_id.".to_string())
3037 })?,
3038 };
3039 std::mem::swap(&mut signed_channel.state, &mut state);
3040
3041 signed_channel.roll_back_state = Some(state);
3042
3043 self.store
3044 .upsert_channel(Channel::Signed(signed_channel), None)
3045 .await?;
3046
3047 false
3048 }
3049 TxType::Revoked {
3050 update_idx,
3051 own_adaptor_signature,
3052 is_offer,
3053 revoked_tx_type,
3054 } => {
3055 let secret = signed_channel
3056 .counter_party_commitment_secrets
3057 .get_secret(update_idx)
3058 .expect("to be able to retrieve the per update secret");
3059 let counter_per_update_secret = SecretKey::from_slice(&secret)
3060 .expect("to be able to parse the counter per update secret.");
3061
3062 let per_update_seed_pk = signed_channel.own_per_update_seed;
3063
3064 let per_update_seed_sk = self
3065 .signer_provider
3066 .get_secret_key_for_pubkey(&per_update_seed_pk)?;
3067
3068 let per_update_secret = SecretKey::from_slice(&build_commitment_secret(
3069 per_update_seed_sk.as_ref(),
3070 update_idx,
3071 ))
3072 .expect("a valid secret key.");
3073
3074 let per_update_point =
3075 PublicKey::from_secret_key(&self.secp, &per_update_secret);
3076
3077 let own_revocation_params = signed_channel.own_points.get_revokable_params(
3078 &self.secp,
3079 &signed_channel.counter_points.revocation_basepoint,
3080 &per_update_point,
3081 );
3082
3083 let counter_per_update_point =
3084 PublicKey::from_secret_key(&self.secp, &counter_per_update_secret);
3085
3086 let base_own_sk = self
3087 .signer_provider
3088 .get_secret_key_for_pubkey(&signed_channel.own_points.own_basepoint)?;
3089
3090 let own_sk = derive_private_key(&self.secp, &per_update_point, &base_own_sk);
3091
3092 let counter_revocation_params =
3093 signed_channel.counter_points.get_revokable_params(
3094 &self.secp,
3095 &signed_channel.own_points.revocation_basepoint,
3096 &counter_per_update_point,
3097 );
3098
3099 let witness = if signed_channel.own_params.fund_pubkey
3100 < signed_channel.counter_params.fund_pubkey
3101 {
3102 tx.input[0].witness.to_vec().remove(1)
3103 } else {
3104 tx.input[0].witness.to_vec().remove(2)
3105 };
3106
3107 let sig_data = witness
3108 .iter()
3109 .take(witness.len() - 1)
3110 .cloned()
3111 .collect::<Vec<_>>();
3112 let own_sig = Signature::from_der(&sig_data)?;
3113
3114 let counter_sk = own_adaptor_signature.recover(
3115 &self.secp,
3116 &own_sig,
3117 &counter_revocation_params.publish_pk.inner,
3118 )?;
3119
3120 let own_revocation_base_secret =
3121 &self.signer_provider.get_secret_key_for_pubkey(
3122 &signed_channel.own_points.revocation_basepoint,
3123 )?;
3124
3125 let counter_revocation_sk = derive_private_revocation_key(
3126 &self.secp,
3127 &counter_per_update_secret,
3128 own_revocation_base_secret,
3129 );
3130
3131 let (offer_params, accept_params) = if is_offer {
3132 (&own_revocation_params, &counter_revocation_params)
3133 } else {
3134 (&counter_revocation_params, &own_revocation_params)
3135 };
3136
3137 let fee_rate_per_vb: u64 = (self.fee_estimator.get_est_sat_per_1000_weight(
3138 lightning::chain::chaininterface::ConfirmationTarget::UrgentOnChainSweep,
3139 ) / 250)
3140 .into();
3141
3142 let signed_tx = match revoked_tx_type {
3143 RevokedTxType::Buffer => {
3144 ddk_dlc::channel::create_and_sign_punish_buffer_transaction(
3145 &self.secp,
3146 offer_params,
3147 accept_params,
3148 &own_sk,
3149 &counter_sk,
3150 &counter_revocation_sk,
3151 &tx,
3152 &self.wallet.get_new_address().await?,
3153 0,
3154 fee_rate_per_vb,
3155 )?
3156 }
3157 RevokedTxType::Settle => {
3158 ddk_dlc::channel::create_and_sign_punish_settle_transaction(
3159 &self.secp,
3160 offer_params,
3161 accept_params,
3162 &own_sk,
3163 &counter_sk,
3164 &counter_revocation_sk,
3165 &tx,
3166 &self.wallet.get_new_address().await?,
3167 CET_NSEQUENCE,
3168 0,
3169 fee_rate_per_vb,
3170 is_offer,
3171 )?
3172 }
3173 };
3174
3175 self.blockchain.send_transaction(&signed_tx).await?;
3176
3177 let closed_channel = Channel::ClosedPunished(ClosedPunishedChannel {
3178 counter_party: signed_channel.counter_party,
3179 temporary_channel_id: signed_channel.temporary_channel_id,
3180 channel_id: signed_channel.channel_id,
3181 punish_txid: signed_tx.compute_txid(),
3182 });
3183
3184 self.chain_monitor
3187 .lock()
3188 .await
3189 .cleanup_channel(signed_channel.channel_id);
3190 self.store.upsert_channel(closed_channel, None).await?;
3191 true
3192 }
3193 TxType::CollaborativeClose => {
3194 if let Some(SignedChannelState::Established {
3195 signed_contract_id, ..
3196 }) = signed_channel.roll_back_state
3197 {
3198 let counter_payout = get_signed_channel_state!(
3199 signed_channel,
3200 CollaborativeCloseOffered,
3201 counter_payout
3202 )?;
3203 let closed_contract = self
3204 .get_collaboratively_closed_contract(
3205 &signed_contract_id,
3206 *counter_payout,
3207 false,
3208 )
3209 .await?;
3210 self.store
3211 .update_contract(&Contract::Closed(closed_contract))
3212 .await?;
3213 }
3214 let closed_channel = Channel::CollaborativelyClosed(ClosedChannel {
3215 counter_party: signed_channel.counter_party,
3216 temporary_channel_id: signed_channel.temporary_channel_id,
3217 channel_id: signed_channel.channel_id,
3218 });
3219 self.chain_monitor
3220 .lock()
3221 .await
3222 .cleanup_channel(signed_channel.channel_id);
3223 self.store.upsert_channel(closed_channel, None).await?;
3224 true
3225 }
3226 TxType::SettleTx => {
3227 let closed_channel = Channel::CounterClosed(ClosedChannel {
3228 counter_party: signed_channel.counter_party,
3229 temporary_channel_id: signed_channel.temporary_channel_id,
3230 channel_id: signed_channel.channel_id,
3231 });
3232 self.chain_monitor
3233 .lock()
3234 .await
3235 .cleanup_channel(signed_channel.channel_id);
3236 self.store.upsert_channel(closed_channel, None).await?;
3237 true
3238 }
3239 TxType::Cet => {
3240 let contract_id = signed_channel.get_contract_id();
3241 let closed_channel = {
3242 match &signed_channel.state {
3243 SignedChannelState::Closing { is_initiator, .. } => {
3244 if *is_initiator {
3245 Channel::Closed(ClosedChannel {
3246 counter_party: signed_channel.counter_party,
3247 temporary_channel_id: signed_channel.temporary_channel_id,
3248 channel_id: signed_channel.channel_id,
3249 })
3250 } else {
3251 Channel::CounterClosed(ClosedChannel {
3252 counter_party: signed_channel.counter_party,
3253 temporary_channel_id: signed_channel.temporary_channel_id,
3254 channel_id: signed_channel.channel_id,
3255 })
3256 }
3257 }
3258 _ => {
3259 log_error!(self.logger, "Saw spending of buffer transaction without being in closing state");
3260 Channel::Closed(ClosedChannel {
3261 counter_party: signed_channel.counter_party,
3262 temporary_channel_id: signed_channel.temporary_channel_id,
3263 channel_id: signed_channel.channel_id,
3264 })
3265 }
3266 }
3267 };
3268
3269 self.chain_monitor
3270 .lock()
3271 .await
3272 .cleanup_channel(signed_channel.channel_id);
3273 let pre_closed_contract = futures::stream::iter(contract_id)
3274 .then(|contract_id| {
3275 let txn = tx.clone();
3276 async move {
3277 self.store.get_contract(&contract_id).await.map(|contract| {
3278 contract.and_then(|contract| match contract {
3279 Contract::Confirmed(signed_contract) => {
3280 Some(Contract::PreClosed(PreClosedContract {
3281 signed_contract,
3282 attestations: None,
3283 signed_cet: txn,
3284 }))
3285 }
3286 _ => None,
3287 })
3288 })
3289 }
3290 })
3291 .try_collect::<Vec<_>>()
3292 .await?
3293 .into_iter()
3294 .flatten()
3295 .next();
3296
3297 self.store
3298 .upsert_channel(closed_channel, pre_closed_contract)
3299 .await?;
3300
3301 true
3302 }
3303 };
3304
3305 if persist {
3306 self.store
3307 .persist_chain_monitor(&*self.chain_monitor.lock().await)
3308 .await?;
3309 }
3310 }
3311 Ok(())
3312 }
3313
3314 async fn check_for_watched_tx(&self) -> Result<(), Error> {
3315 let confirmed_txs = self.chain_monitor.lock().await.confirmed_txs();
3316
3317 self.process_watched_txs(confirmed_txs).await?;
3318
3319 self.store
3320 .persist_chain_monitor(&*self.chain_monitor.lock().await)
3321 .await?;
3322
3323 Ok(())
3324 }
3325
3326 async fn force_close_channel_internal(
3327 &self,
3328 mut channel: SignedChannel,
3329 is_initiator: bool,
3330 ) -> Result<(), Error> {
3331 match &channel.state {
3332 SignedChannelState::Established {
3333 counter_buffer_adaptor_signature,
3334 buffer_transaction,
3335 ..
3336 } => {
3337 let counter_buffer_adaptor_signature = *counter_buffer_adaptor_signature;
3338 let buffer_transaction = buffer_transaction.clone();
3339 self.initiate_unilateral_close_established_channel(
3340 channel,
3341 is_initiator,
3342 counter_buffer_adaptor_signature,
3343 buffer_transaction,
3344 )
3345 .await
3346 }
3347 SignedChannelState::RenewFinalized {
3348 buffer_transaction,
3349 offer_buffer_adaptor_signature,
3350 ..
3351 } => {
3352 let offer_buffer_adaptor_signature = *offer_buffer_adaptor_signature;
3353 let buffer_transaction = buffer_transaction.clone();
3354 self.initiate_unilateral_close_established_channel(
3355 channel,
3356 is_initiator,
3357 offer_buffer_adaptor_signature,
3358 buffer_transaction,
3359 )
3360 .await
3361 }
3362 SignedChannelState::Settled { .. } => {
3363 self.close_settled_channel(channel, is_initiator).await
3364 }
3365 SignedChannelState::SettledOffered { .. }
3366 | SignedChannelState::SettledReceived { .. }
3367 | SignedChannelState::SettledAccepted { .. }
3368 | SignedChannelState::SettledConfirmed { .. }
3369 | SignedChannelState::RenewOffered { .. }
3370 | SignedChannelState::RenewAccepted { .. }
3371 | SignedChannelState::RenewConfirmed { .. }
3372 | SignedChannelState::CollaborativeCloseOffered { .. } => {
3373 channel.state = channel
3374 .roll_back_state
3375 .take()
3376 .expect("to have a rollback state");
3377 let channel_clone = channel.clone(); Box::pin(self.force_close_channel_internal(channel_clone, is_initiator)).await
3379 }
3380 SignedChannelState::Closing { .. } => Err(Error::InvalidState(
3381 "Channel is already closing.".to_string(),
3382 )),
3383 }
3384 }
3385
3386 async fn initiate_unilateral_close_established_channel(
3388 &self,
3389 mut signed_channel: SignedChannel,
3390 is_initiator: bool,
3391 buffer_adaptor_signature: EcdsaAdaptorSignature,
3392 buffer_transaction: Transaction,
3393 ) -> Result<(), Error> {
3394 let keys_id = signed_channel.keys_id().ok_or_else(|| {
3395 Error::InvalidState("Expected to be in a state with an associated keys id.".to_string())
3396 })?;
3397
3398 crate::channel_updater::initiate_unilateral_close_established_channel(
3399 &self.secp,
3400 &mut signed_channel,
3401 buffer_adaptor_signature,
3402 keys_id,
3403 buffer_transaction,
3404 &self.signer_provider,
3405 is_initiator,
3406 )?;
3407
3408 let buffer_transaction =
3409 get_signed_channel_state!(signed_channel, Closing, ref buffer_transaction)?;
3410
3411 self.blockchain.send_transaction(buffer_transaction).await?;
3412
3413 self.chain_monitor
3414 .lock()
3415 .await
3416 .remove_tx(&buffer_transaction.compute_txid());
3417
3418 self.store
3419 .upsert_channel(Channel::Signed(signed_channel), None)
3420 .await?;
3421
3422 self.store
3423 .persist_chain_monitor(&*self.chain_monitor.lock().await)
3424 .await?;
3425
3426 Ok(())
3427 }
3428
3429 async fn close_settled_channel(
3431 &self,
3432 signed_channel: SignedChannel,
3433 is_initiator: bool,
3434 ) -> Result<(), Error> {
3435 let (settle_tx, closed_channel) = crate::channel_updater::close_settled_channel(
3436 &self.secp,
3437 &signed_channel,
3438 &self.signer_provider,
3439 is_initiator,
3440 )?;
3441
3442 if self
3443 .blockchain
3444 .get_transaction_confirmations(&settle_tx.compute_txid())
3445 .await
3446 .unwrap_or(0)
3447 == 0
3448 {
3449 self.blockchain.send_transaction(&settle_tx).await?;
3450 }
3451
3452 self.chain_monitor
3453 .lock()
3454 .await
3455 .cleanup_channel(signed_channel.channel_id);
3456
3457 self.store.upsert_channel(closed_channel, None).await?;
3458
3459 Ok(())
3460 }
3461
3462 async fn get_collaboratively_closed_contract(
3463 &self,
3464 contract_id: &ContractId,
3465 payout: Amount,
3466 is_own_payout: bool,
3467 ) -> Result<ClosedContract, Error> {
3468 let contract = get_contract_in_state!(self, contract_id, Confirmed, None::<PublicKey>)?;
3469 let own_collateral = if contract.accepted_contract.offered_contract.is_offer_party {
3470 contract
3471 .accepted_contract
3472 .offered_contract
3473 .offer_params
3474 .collateral
3475 } else {
3476 contract.accepted_contract.accept_params.collateral
3477 };
3478 let own_payout = if is_own_payout {
3479 payout
3480 } else {
3481 contract.accepted_contract.offered_contract.total_collateral - payout
3482 };
3483 let pnl =
3484 SignedAmount::from_sat(own_payout.to_sat() as i64 - own_collateral.to_sat() as i64);
3485 Ok(ClosedContract {
3486 attestations: None,
3487 signed_cet: None,
3488 contract_id: *contract_id,
3489 temporary_contract_id: contract.accepted_contract.offered_contract.id,
3490 counter_party_id: contract.accepted_contract.offered_contract.counter_party,
3491 funding_txid: contract
3492 .accepted_contract
3493 .dlc_transactions
3494 .fund
3495 .compute_txid(),
3496 pnl,
3497 signed_contract: contract.clone(),
3498 })
3499 }
3500
3501 async fn oracle_announcements(
3502 &self,
3503 contract_input: &ContractInput,
3504 ) -> Result<Vec<Vec<OracleAnnouncement>>, Error> {
3505 let announcements = stream::iter(contract_input.contract_infos.iter())
3506 .map(|x| {
3507 let future = self.get_oracle_announcements(&x.oracles);
3508 async move {
3509 match future.await {
3510 Ok(result) => Ok(result),
3511 Err(e) => {
3512 log_error!(self.logger, "Failed to get oracle announcements: {}", e);
3513 Err(e)
3514 }
3515 }
3516 }
3517 })
3518 .collect::<FuturesUnordered<_>>()
3519 .await
3520 .try_collect::<Vec<_>>()
3521 .await?;
3522 Ok(announcements)
3523 }
3524}