1use super::{
4 Blockchain, CachedContractSignerProvider, ContractSigner, Oracle, Storage, Time, Wallet,
5};
6use crate::chain_monitor::{ChainMonitor, ChannelInfo, RevokedTxType, TxType};
7use crate::channel::offered_channel::OfferedChannel;
8use crate::channel::signed_channel::{SignedChannel, SignedChannelState, SignedChannelStateType};
9use crate::channel::{Channel, ClosedChannel, ClosedPunishedChannel};
10use crate::channel_updater::get_signed_channel_state;
11use crate::channel_updater::verify_signed_channel;
12use crate::contract::{
13 accepted_contract::AcceptedContract, contract_info::ContractInfo,
14 contract_input::ContractInput, contract_input::OracleInput, offered_contract::OfferedContract,
15 signed_contract::SignedContract, AdaptorInfo, ClosedContract, Contract, FailedAcceptContract,
16 FailedSignContract, PreClosedContract,
17};
18use crate::contract_updater::{accept_contract, verify_accepted_and_sign_contract};
19use crate::error::Error;
20use crate::utils::get_object_in_state;
21use crate::{ChannelId, ContractId, ContractSignerProvider};
22use bitcoin::absolute::Height;
23use bitcoin::consensus::encode::serialize_hex;
24use bitcoin::consensus::Decodable;
25use bitcoin::hex::DisplayHex;
26use bitcoin::{Address, Amount, SignedAmount};
27use bitcoin::{OutPoint, Transaction};
28use ddk_messages::channel::{
29 AcceptChannel, CollaborativeCloseOffer, OfferChannel, Reject, RenewAccept, RenewConfirm,
30 RenewFinalize, RenewOffer, RenewRevoke, SettleAccept, SettleConfirm, SettleFinalize,
31 SettleOffer, SignChannel,
32};
33use ddk_messages::oracle_msgs::{OracleAnnouncement, OracleAttestation};
34use ddk_messages::{AcceptDlc, CloseDlc, Message as DlcMessage, OfferDlc, SignDlc};
35use futures::stream;
36use futures::stream::FuturesUnordered;
37use futures::{StreamExt, TryStreamExt};
38use lightning::chain::chaininterface::FeeEstimator;
39use lightning::ln::chan_utils::{
40 build_commitment_secret, derive_private_key, derive_private_revocation_key,
41};
42use lightning::util::logger::Logger;
43use lightning::{log_debug, log_error, log_info, log_trace, log_warn};
44use once_cell::sync::Lazy;
45use secp256k1_zkp::XOnlyPublicKey;
46use secp256k1_zkp::{
47 ecdsa::Signature, All, EcdsaAdaptorSignature, PublicKey, Secp256k1, SecretKey,
48};
49use std::collections::HashMap;
50use std::ops::Deref;
51use std::string::ToString;
52use std::sync::Arc;
53use tokio::sync::Mutex;
54
55pub const DEFAULT_NB_CONFIRMATIONS: u32 = 3;
57
58static NB_CONFIRMATIONS: Lazy<u32> = Lazy::new(|| match std::env::var("NB_CONFIRMATIONS") {
61 Ok(val) => val.parse().unwrap_or(DEFAULT_NB_CONFIRMATIONS),
62 Err(_) => DEFAULT_NB_CONFIRMATIONS,
63});
64
65pub const REFUND_DELAY: u32 = 86400 * 7;
67pub const CET_NSEQUENCE: u32 = 288;
69pub const PEER_TIMEOUT: u64 = 3600;
72
73type ClosableContractInfo<'a> = Option<(
74 &'a ContractInfo,
75 &'a AdaptorInfo,
76 Vec<(usize, OracleAttestation)>,
77)>;
78
79#[derive(Debug)]
81pub struct Manager<
82 W: Deref,
83 SP: Deref,
84 B: Deref,
85 S: Deref,
86 O: Deref,
87 T: Deref,
88 F: Deref,
89 X: ContractSigner,
90 L: Deref,
91> where
92 W::Target: Wallet,
93 SP::Target: ContractSignerProvider<Signer = X>,
94 B::Target: Blockchain,
95 S::Target: Storage,
96 O::Target: Oracle,
97 T::Target: Time,
98 F::Target: FeeEstimator,
99 L::Target: Logger,
100{
101 oracles: HashMap<XOnlyPublicKey, O>,
102 wallet: W,
103 signer_provider: SP,
104 blockchain: B,
105 store: S,
106 secp: Secp256k1<All>,
107 chain_monitor: Mutex<ChainMonitor>,
108 time: T,
109 fee_estimator: F,
110 logger: L,
111}
112
113macro_rules! get_contract_in_state {
114 ($manager: ident, $contract_id: expr, $state: ident, $peer_id: expr) => {{
115 get_object_in_state!(
116 $manager,
117 $contract_id,
118 $state,
119 $peer_id,
120 Contract,
121 get_contract
122 )
123 }};
124}
125
126macro_rules! get_channel_in_state {
127 ($manager: ident, $channel_id: expr, $state: ident, $peer_id: expr) => {{
128 get_object_in_state!(
129 $manager,
130 $channel_id,
131 $state,
132 $peer_id,
133 Channel,
134 get_channel
135 )
136 }};
137}
138
139macro_rules! get_signed_channel_rollback_state {
140 ($signed_channel: ident, $state: ident, $($field: ident),*) => {{
141 match $signed_channel.roll_back_state.as_ref() {
142 Some(SignedChannelState::$state{$($field,)* ..}) => Ok(($($field,)*)),
143 _ => Err(Error::InvalidState(format!("Expected rollback state {} got {:?}", stringify!($state), $signed_channel.state))),
144 }
145 }};
146}
147
148macro_rules! check_for_timed_out_channels {
149 ($manager: ident, $state: ident) => {
150 let channels = $manager
151 .store
152 .get_signed_channels(Some(SignedChannelStateType::$state))
153 .await?;
154
155 for channel in channels {
156 if let SignedChannelState::$state { timeout, .. } = channel.state {
157 let is_timed_out = timeout < $manager.time.unix_time_now();
158 if is_timed_out {
159 match $manager.force_close_channel_internal(channel, true).await {
160 Err(e) => {
161 log_error!($manager.logger, "Error force closing channel. error={}", e)
162 }
163 _ => {}
164 }
165 }
166 }
167 }
168 };
169}
170
171impl<
172 W: Deref,
173 SP: Deref,
174 B: Deref,
175 S: Deref,
176 O: Deref,
177 T: Deref,
178 F: Deref,
179 X: ContractSigner,
180 L: Deref,
181 > Manager<W, Arc<CachedContractSignerProvider<SP, X>>, B, S, O, T, F, X, L>
182where
183 W::Target: Wallet,
184 SP::Target: ContractSignerProvider<Signer = X>,
185 B::Target: Blockchain,
186 S::Target: Storage,
187 O::Target: Oracle,
188 T::Target: Time,
189 F::Target: FeeEstimator,
190 L::Target: Logger,
191{
192 #[allow(clippy::too_many_arguments)]
194 #[tracing::instrument(skip_all)]
195 pub async fn new(
196 wallet: W,
197 signer_provider: SP,
198 blockchain: B,
199 store: S,
200 oracles: HashMap<XOnlyPublicKey, O>,
201 time: T,
202 fee_estimator: F,
203 logger: L,
204 ) -> Result<Self, Error> {
205 let init_height = blockchain.get_blockchain_height().await?;
206 let chain_monitor = Mutex::new(
207 store
208 .get_chain_monitor()
209 .await?
210 .unwrap_or(ChainMonitor::new(init_height)),
211 );
212
213 let signer_provider = Arc::new(CachedContractSignerProvider::new(signer_provider));
214
215 log_info!(logger, "Manager initialized");
216
217 Ok(Manager {
218 secp: secp256k1_zkp::Secp256k1::new(),
219 wallet,
220 signer_provider,
221 blockchain,
222 store,
223 oracles,
224 time,
225 fee_estimator,
226 chain_monitor,
227 logger,
228 })
229 }
230
231 pub fn get_store(&self) -> &S {
233 &self.store
234 }
235
236 #[tracing::instrument(skip_all)]
238 pub async fn on_dlc_message(
239 &self,
240 msg: &DlcMessage,
241 counter_party: PublicKey,
242 ) -> Result<Option<DlcMessage>, Error> {
243 match msg {
244 DlcMessage::Offer(o) => {
245 log_debug!(self.logger, "Received offer message");
246 self.on_offer_message(o, counter_party).await?;
247 Ok(None)
248 }
249 DlcMessage::Accept(a) => {
250 log_debug!(self.logger, "Received accept message");
251 Ok(Some(self.on_accept_message(a, &counter_party).await?))
252 }
253 DlcMessage::Sign(s) => {
254 log_debug!(self.logger, "Received sign message");
255 self.on_sign_message(s, &counter_party).await?;
256 Ok(None)
257 }
258 DlcMessage::Close(c) => {
259 log_debug!(self.logger, "Received close message");
260 self.on_close_message(c, &counter_party).await?;
261 Ok(None)
262 }
263 DlcMessage::OfferChannel(o) => {
264 log_debug!(self.logger, "Received offer channel message");
265 self.on_offer_channel(o, counter_party).await?;
266 Ok(None)
267 }
268 DlcMessage::AcceptChannel(a) => {
269 log_debug!(self.logger, "Received accept channel message");
270 Ok(Some(DlcMessage::SignChannel(
271 self.on_accept_channel(a, &counter_party).await?,
272 )))
273 }
274 DlcMessage::SignChannel(s) => {
275 log_debug!(self.logger, "Received sign channel message");
276 self.on_sign_channel(s, &counter_party).await?;
277 Ok(None)
278 }
279 DlcMessage::SettleOffer(s) => {
280 log_debug!(self.logger, "Received settle offer message");
281 match self.on_settle_offer(s, &counter_party).await? {
282 Some(msg) => Ok(Some(DlcMessage::Reject(msg))),
283 None => Ok(None),
284 }
285 }
286 DlcMessage::SettleAccept(s) => {
287 log_debug!(self.logger, "Received settle accept message");
288 Ok(Some(DlcMessage::SettleConfirm(
289 self.on_settle_accept(s, &counter_party).await?,
290 )))
291 }
292 DlcMessage::SettleConfirm(s) => {
293 log_debug!(self.logger, "Received settle confirm message");
294 Ok(Some(DlcMessage::SettleFinalize(
295 self.on_settle_confirm(s, &counter_party).await?,
296 )))
297 }
298 DlcMessage::SettleFinalize(s) => {
299 log_debug!(self.logger, "Received settle finalize message");
300 self.on_settle_finalize(s, &counter_party).await?;
301 Ok(None)
302 }
303 DlcMessage::RenewOffer(r) => {
304 log_debug!(self.logger, "Received renew offer message");
305 match self.on_renew_offer(r, &counter_party).await? {
306 Some(msg) => Ok(Some(DlcMessage::Reject(msg))),
307 None => Ok(None),
308 }
309 }
310 DlcMessage::RenewAccept(r) => {
311 log_debug!(self.logger, "Received renew accept message");
312 Ok(Some(DlcMessage::RenewConfirm(
313 self.on_renew_accept(r, &counter_party).await?,
314 )))
315 }
316 DlcMessage::RenewConfirm(r) => {
317 log_debug!(self.logger, "Received renew confirm message");
318 Ok(Some(DlcMessage::RenewFinalize(
319 self.on_renew_confirm(r, &counter_party).await?,
320 )))
321 }
322 DlcMessage::RenewFinalize(r) => {
323 log_debug!(self.logger, "Received renew finalize message");
324 let revoke = self.on_renew_finalize(r, &counter_party).await?;
325 Ok(Some(DlcMessage::RenewRevoke(revoke)))
326 }
327 DlcMessage::RenewRevoke(r) => {
328 log_debug!(self.logger, "Received renew revoke message");
329 self.on_renew_revoke(r, &counter_party).await?;
330 Ok(None)
331 }
332 DlcMessage::CollaborativeCloseOffer(c) => {
333 log_debug!(self.logger, "Received collaborative close offer message");
334 self.on_collaborative_close_offer(c, &counter_party).await?;
335 Ok(None)
336 }
337 DlcMessage::Reject(r) => {
338 log_debug!(self.logger, "Received reject message");
339 self.on_reject(r, &counter_party).await?;
340 Ok(None)
341 }
342 }
343 }
344
345 #[tracing::instrument(skip_all)]
347 pub async fn send_splice_offer(
348 &self,
349 contract_input: &ContractInput,
350 counter_party: PublicKey,
351 contract_id: &ContractId,
352 ) -> Result<OfferDlc, Error> {
353 let oracle_announcements = self.oracle_announcements(contract_input).await?;
354
355 self.send_splice_offer_with_announcements(
356 contract_input,
357 counter_party,
358 contract_id,
359 oracle_announcements,
360 )
361 .await
362 }
363
364 #[tracing::instrument(skip_all)]
367 pub async fn send_splice_offer_with_announcements(
368 &self,
369 contract_input: &ContractInput,
370 counter_party: PublicKey,
371 contract_id: &ContractId,
372 oracle_announcements: Vec<Vec<OracleAnnouncement>>,
373 ) -> Result<OfferDlc, Error> {
374 let confirmed_contract =
375 get_contract_in_state!(self, contract_id, Confirmed, Some(counter_party))?;
376
377 let dlc_input = confirmed_contract.get_dlc_input();
378
379 let (offered_contract, offer_msg) = crate::contract_updater::offer_contract(
380 &self.secp,
381 contract_input,
382 oracle_announcements,
383 vec![dlc_input],
384 REFUND_DELAY,
385 &counter_party,
386 &self.wallet,
387 &self.blockchain,
388 &self.time,
389 &self.signer_provider,
390 &self.logger,
391 )
392 .await?;
393
394 offered_contract.validate()?;
395
396 self.store.create_contract(&offered_contract).await?;
397
398 Ok(offer_msg)
399 }
400
401 #[tracing::instrument(skip_all)]
406 pub async fn send_offer(
407 &self,
408 contract_input: &ContractInput,
409 counter_party: PublicKey,
410 ) -> Result<OfferDlc, Error> {
411 let oracle_announcements = self.oracle_announcements(contract_input).await?;
413
414 self.send_offer_with_announcements(contract_input, counter_party, oracle_announcements)
415 .await
416 }
417
418 #[tracing::instrument(skip_all)]
424 pub async fn send_offer_with_announcements(
425 &self,
426 contract_input: &ContractInput,
427 counter_party: PublicKey,
428 oracle_announcements: Vec<Vec<OracleAnnouncement>>,
429 ) -> Result<OfferDlc, Error> {
430 let (offered_contract, offer_msg) = crate::contract_updater::offer_contract(
431 &self.secp,
432 contract_input,
433 oracle_announcements,
434 vec![],
435 REFUND_DELAY,
436 &counter_party,
437 &self.wallet,
438 &self.blockchain,
439 &self.time,
440 &self.signer_provider,
441 &self.logger,
442 )
443 .await?;
444
445 offered_contract.validate()?;
446
447 self.store.create_contract(&offered_contract).await?;
448
449 Ok(offer_msg)
450 }
451
452 #[tracing::instrument(skip_all)]
454 pub async fn accept_contract_offer(
455 &self,
456 contract_id: &ContractId,
457 ) -> Result<(ContractId, PublicKey, AcceptDlc), Error> {
458 let offered_contract =
459 get_contract_in_state!(self, contract_id, Offered, None as Option<PublicKey>)?;
460
461 let counter_party = offered_contract.counter_party;
462
463 let (accepted_contract, accept_msg) = accept_contract(
464 &self.secp,
465 &offered_contract,
466 &self.wallet,
467 &self.signer_provider,
468 &self.blockchain,
469 &self.logger,
470 )
471 .await?;
472
473 self.wallet.import_address(&Address::p2wsh(
474 &accepted_contract.dlc_transactions.funding_script_pubkey,
475 self.blockchain.get_network()?,
476 ))?;
477
478 let contract_id = accepted_contract.get_contract_id();
479
480 self.store
481 .update_contract(&Contract::Accepted(accepted_contract))
482 .await?;
483
484 log_info!(
485 self.logger,
486 "Accepted and stored the contract. temp_id={} contract_id={}",
487 offered_contract.id.to_lower_hex_string(),
488 contract_id.to_lower_hex_string(),
489 );
490
491 Ok((contract_id, counter_party, accept_msg))
492 }
493
494 #[tracing::instrument(skip_all)]
500 pub async fn periodic_chain_monitor(&self) -> Result<(), Error> {
501 let cur_height = self.blockchain.get_blockchain_height().await?;
502 let last_height = self.chain_monitor.lock().await.last_height;
503
504 if cur_height < last_height {
507 return Err(Error::InvalidState(
508 "Current height is lower than last height.".to_string(),
509 ));
510 }
511
512 for height in last_height + 1..=cur_height {
513 let block = self.blockchain.get_block_at_height(height).await?;
514
515 self.chain_monitor
516 .lock()
517 .await
518 .process_block(&block, height);
519 }
520
521 Ok(())
522 }
523
524 #[tracing::instrument(skip_all, level = "debug")]
527 pub async fn periodic_check(&self, check_channels: bool) -> Result<(), Error> {
528 let signed_contracts = self.check_for_spliced_contract().await?;
529 self.check_signed_contracts(&signed_contracts).await?;
530 self.check_confirmed_contracts().await?;
531 self.check_preclosed_contracts().await?;
532
533 if check_channels {
534 self.channel_checks().await?;
535 }
536
537 Ok(())
538 }
539
540 #[tracing::instrument(skip_all)]
542 pub async fn on_offer_message(
543 &self,
544 offered_message: &OfferDlc,
545 counter_party: PublicKey,
546 ) -> Result<(), Error> {
547 offered_message.validate(&self.secp, REFUND_DELAY, REFUND_DELAY * 2)?;
548 let keys_id = self
549 .signer_provider
550 .derive_signer_key_id(false, offered_message.temporary_contract_id);
551 let contract: OfferedContract =
552 OfferedContract::try_from_offer_dlc(offered_message, counter_party, keys_id)?;
553 contract.validate()?;
554
555 if self.store.get_contract(&contract.id).await?.is_some() {
556 return Err(Error::InvalidParameters(
557 "Contract with identical id already exists".to_string(),
558 ));
559 }
560
561 self.store.create_contract(&contract).await?;
562 log_info!(
563 self.logger,
564 "Created and stored the offered contract. temp_id={}",
565 contract.id.to_lower_hex_string(),
566 );
567 Ok(())
568 }
569
570 #[tracing::instrument(skip_all)]
572 pub async fn on_close_message(
573 &self,
574 close_msg: &CloseDlc,
575 counter_party: &PublicKey,
576 ) -> Result<(), Error> {
577 let signed_contract = get_contract_in_state!(
579 self,
580 &close_msg.contract_id,
581 Confirmed,
582 Some(*counter_party)
583 )?;
584
585 let _close_tx = crate::contract_updater::complete_cooperative_close(
588 &self.secp,
589 &signed_contract,
590 close_msg,
591 &self.signer_provider,
592 &self.logger,
593 )?;
594
595 Ok(())
598 }
599
600 #[tracing::instrument(skip_all)]
602 pub async fn on_accept_message(
603 &self,
604 accept_msg: &AcceptDlc,
605 counter_party: &PublicKey,
606 ) -> Result<DlcMessage, Error> {
607 let offered_contract = get_contract_in_state!(
608 self,
609 &accept_msg.temporary_contract_id,
610 Offered,
611 Some(*counter_party)
612 )?;
613
614 let (signed_contract, signed_msg) = match verify_accepted_and_sign_contract(
615 &self.secp,
616 &offered_contract,
617 accept_msg,
618 &self.wallet,
619 &self.signer_provider,
620 &self.store,
621 &self.logger,
622 )
623 .await
624 {
625 Ok(contract) => contract,
626 Err(e) => {
627 log_error!(
628 self.logger,
629 "Error in on_accept_message. tmp_contract_id={} error={}",
630 offered_contract.id.to_lower_hex_string(),
631 e.to_string()
632 );
633 return self
634 .accept_fail_on_error(offered_contract, accept_msg.clone(), e)
635 .await;
636 }
637 };
638
639 log_info!(
640 self.logger,
641 "Verified the accept message and signed the contract. temp_id={} contract_id={}",
642 offered_contract.id.to_lower_hex_string(),
643 signed_contract.accepted_contract.get_contract_id_string(),
644 );
645
646 let contract_id = signed_contract.accepted_contract.get_contract_id_string();
647
648 self.wallet.import_address(&Address::p2wsh(
649 &signed_contract
650 .accepted_contract
651 .dlc_transactions
652 .funding_script_pubkey,
653 self.blockchain.get_network()?,
654 ))?;
655
656 self.store
657 .update_contract(&Contract::Signed(signed_contract))
658 .await?;
659
660 log_info!(
661 self.logger,
662 "Accepted and signed the contract. temp_id={} contract_id={}",
663 offered_contract.id.to_lower_hex_string(),
664 contract_id,
665 );
666
667 Ok(DlcMessage::Sign(signed_msg))
668 }
669
670 #[tracing::instrument(skip_all)]
672 pub async fn on_sign_message(
673 &self,
674 sign_message: &SignDlc,
675 peer_id: &PublicKey,
676 ) -> Result<(), Error> {
677 let accepted_contract =
678 get_contract_in_state!(self, &sign_message.contract_id, Accepted, Some(*peer_id))?;
679 let (signed_contract, fund_tx) = match crate::contract_updater::verify_signed_contract(
680 &self.secp,
681 &accepted_contract,
682 sign_message,
683 &self.wallet,
684 &self.store,
685 &self.signer_provider,
686 &self.logger,
687 )
688 .await
689 {
690 Ok(contract) => contract,
691 Err(e) => {
692 log_error!(
693 self.logger,
694 "Error in on_sign_message. contract_id={} error={}",
695 accepted_contract.get_contract_id_string(),
696 e.to_string()
697 );
698 return self
699 .sign_fail_on_error(accepted_contract, sign_message.clone(), e)
700 .await;
701 }
702 };
703
704 self.store
705 .update_contract(&Contract::Signed(signed_contract.clone()))
706 .await?;
707
708 self.blockchain.send_transaction(&fund_tx).await?;
709
710 if accepted_contract
713 .offered_contract
714 .funding_inputs
715 .iter()
716 .any(|c| c.dlc_input.is_some())
717 {
718 let Some(dlc_input) = accepted_contract
719 .offered_contract
720 .funding_inputs
721 .iter()
722 .find(|c| c.dlc_input.is_some())
723 else {
724 return Ok(());
725 };
726 let preclosed_contract = get_contract_in_state!(
727 self,
728 &dlc_input.dlc_input.as_ref().unwrap().contract_id,
729 Confirmed,
730 None as Option<PublicKey>
731 )?;
732
733 let preclosed_contract = PreClosedContract {
734 signed_contract: preclosed_contract.clone(),
735 attestations: None,
736 signed_cet: signed_contract
737 .accepted_contract
738 .dlc_transactions
739 .fund
740 .clone(),
741 };
742
743 log_debug!(self.logger,
744 "Contract contains a DLC input. Marking the previous contract as pre-closed. dlc_input_contract_id={}",
745 preclosed_contract.signed_contract.accepted_contract.get_contract_id_string()
746 );
747
748 self.store
749 .update_contract(&Contract::PreClosed(preclosed_contract.clone()))
750 .await?;
751 }
752
753 Ok(())
754 }
755
756 #[tracing::instrument(skip_all, level = "debug")]
757 async fn get_oracle_announcements(
758 &self,
759 oracle_inputs: &OracleInput,
760 ) -> Result<Vec<OracleAnnouncement>, Error> {
761 let mut announcements = Vec::new();
762 for pubkey in &oracle_inputs.public_keys {
763 let oracle = self
764 .oracles
765 .get(pubkey)
766 .ok_or_else(|| Error::InvalidParameters("Unknown oracle public key".to_string()))?;
767 let announcement = oracle.get_announcement(&oracle_inputs.event_id).await?;
768 announcements.push(announcement);
769 }
770
771 Ok(announcements)
772 }
773
774 async fn sign_fail_on_error<R>(
775 &self,
776 accepted_contract: AcceptedContract,
777 sign_message: SignDlc,
778 e: Error,
779 ) -> Result<R, Error> {
780 log_error!(
781 self.logger,
782 "Error in on_sign_message marking contract as failed sign. contract_id={} error={}",
783 accepted_contract.get_contract_id_string(),
784 e.to_string()
785 );
786 self.store
787 .update_contract(&Contract::FailedSign(FailedSignContract {
788 accepted_contract,
789 sign_message,
790 error_message: e.to_string(),
791 }))
792 .await?;
793 Err(e)
794 }
795
796 async fn accept_fail_on_error<R>(
797 &self,
798 offered_contract: OfferedContract,
799 accept_message: AcceptDlc,
800 e: Error,
801 ) -> Result<R, Error> {
802 log_error!(
803 self.logger,
804 "Error in on_accept_message marking contract as failed accept. contract_id={} error={}",
805 offered_contract.id.to_lower_hex_string(),
806 e.to_string()
807 );
808 self.store
809 .update_contract(&Contract::FailedAccept(FailedAcceptContract {
810 offered_contract,
811 accept_message,
812 error_message: e.to_string(),
813 }))
814 .await?;
815 Err(e)
816 }
817
818 #[tracing::instrument(skip_all, level = "debug")]
819 async fn check_signed_contract(&self, contract: &SignedContract) -> Result<(), Error> {
820 let confirmations = self
821 .blockchain
822 .get_transaction_confirmations(
823 &contract
824 .accepted_contract
825 .dlc_transactions
826 .fund
827 .compute_txid(),
828 )
829 .await?;
830 if confirmations >= *NB_CONFIRMATIONS {
831 log_info!(
832 self.logger,
833 "Marking signed contract as confirmed. confirmations={} contract_id={}",
834 confirmations,
835 contract.accepted_contract.get_contract_id_string(),
836 );
837 self.store
838 .update_contract(&Contract::Confirmed(contract.clone()))
839 .await?;
840 } else {
841 log_debug!(self.logger,
842 "Not enough confirmations to mark contract as confirmed. confirmations={} required={} contract_id={}",
843 confirmations,
844 *NB_CONFIRMATIONS,
845 contract.accepted_contract.get_contract_id_string(),
846 );
847 }
848 Ok(())
849 }
850
851 #[tracing::instrument(skip_all, level = "debug")]
852 async fn check_signed_contracts(
853 &self,
854 signed_contracts: &[SignedContract],
855 ) -> Result<(), Error> {
856 for c in signed_contracts {
857 if let Err(e) = self.check_signed_contract(c).await {
858 log_error!(
859 self.logger,
860 "Error checking signed contract. contract_id={} error={}",
861 c.accepted_contract.get_contract_id_string(),
862 e.to_string()
863 )
864 }
865 }
866
867 Ok(())
868 }
869
870 #[tracing::instrument(skip_all, level = "debug")]
871 async fn check_for_spliced_contract(&self) -> Result<Vec<SignedContract>, Error> {
872 let contracts = self.get_store().get_signed_contracts().await?;
873 for contract in &contracts {
874 let dlc_input = contract
875 .accepted_contract
876 .offered_contract
877 .funding_inputs
878 .iter()
879 .find(|d| d.dlc_input.is_some());
880
881 let Some(funding_input) = dlc_input else {
882 continue;
883 };
884
885 let contract_id = funding_input.dlc_input.as_ref().unwrap().contract_id;
886
887 let confirmed_contract_in_splice = match get_contract_in_state!(
888 self,
889 &contract_id,
890 Confirmed,
891 None as Option<PublicKey>
892 ) {
893 Ok(c) => Ok(c),
894 Err(Error::InvalidState(e)) => {
895 log_trace!(self.logger,
896 "The previouse contract referenced in a splice transaction is in an unexpected state. contract_id={} error={}",
897 contract_id.to_lower_hex_string(), e.to_string(),
898 );
899 continue;
900 }
901 Err(e) => {
902 log_debug!(self.logger,
903 "The previouse contract referenced in a splice transaction failed to retrieve. contract_id={} error={}",
904 contract_id.to_lower_hex_string(), e.to_string(),
905 );
906 Err(e)
907 }
908 }?;
909
910 let preclosed_contract = PreClosedContract {
911 signed_contract: confirmed_contract_in_splice.clone(),
912 attestations: None,
913 signed_cet: contract.accepted_contract.dlc_transactions.fund.clone(),
914 };
915
916 log_debug!(self.logger,
917 "The previous contract that was spliced is now closed because the splice contract is confirmed. contract_id={}",
918 contract_id.to_lower_hex_string(),
919 );
920
921 self.store
922 .update_contract(&Contract::PreClosed(preclosed_contract.clone()))
923 .await?;
924 }
925 Ok(contracts)
926 }
927
928 #[tracing::instrument(skip_all, level = "debug")]
929 async fn check_confirmed_contracts(&self) -> Result<(), Error> {
930 for c in self.store.get_confirmed_contracts().await? {
931 if c.channel_id.is_some() {
933 continue;
934 }
935 if let Err(e) = self.check_confirmed_contract(&c).await {
936 log_error!(
937 self.logger,
938 "Error checking confirmed contract. contract_id={} error={}",
939 c.accepted_contract.get_contract_id_string(),
940 e.to_string()
941 )
942 }
943 }
944
945 Ok(())
946 }
947
948 #[tracing::instrument(skip_all, level = "debug")]
949 async fn get_closable_contract_info<'a>(
950 &'a self,
951 contract: &'a SignedContract,
952 ) -> ClosableContractInfo<'a> {
953 let contract_infos = &contract.accepted_contract.offered_contract.contract_info;
954 let adaptor_infos = &contract.accepted_contract.adaptor_infos;
955 for (contract_info, adaptor_info) in contract_infos.iter().zip(adaptor_infos.iter()) {
956 log_debug!(
957 self.logger,
958 "Checking contract for oracle maturation. contract_id={}",
959 contract.accepted_contract.get_contract_id_string()
960 );
961 let matured: Vec<_> = contract_info
962 .oracle_announcements
963 .iter()
964 .filter(|x| {
965 (x.oracle_event.event_maturity_epoch as u64) <= self.time.unix_time_now()
966 })
967 .enumerate()
968 .collect();
969 if matured.len() >= contract_info.threshold {
970 let attestations = stream::iter(matured.iter())
971 .map(|(i, announcement)| async move {
972 log_debug!(self.logger,
973 "Oracle announcement for contract is matured. Getting attestations. contract_id={} event_id={}",
974 contract.accepted_contract.get_contract_id_string(),
975 announcement.oracle_event.event_id
976 );
977 let oracle = match self.oracles.get(&announcement.oracle_public_key) {
979 Some(oracle) => oracle,
980 None => {
981 log_debug!(self.logger,
982 "Oracle not found. pubkey={}. contract_id={} event_id={}",
983 announcement.oracle_public_key,
984 contract.accepted_contract.get_contract_id_string(),
985 announcement.oracle_event.event_id
986 );
987 return None;
988 }
989 };
990 let attestation = match oracle
992 .get_attestation(&announcement.oracle_event.event_id)
993 .await
994 {
995 Ok(attestation) => attestation,
996 Err(e) => {
997 log_error!(self.logger,
998 "Attestation not found for event. pubkey={} event_id={} error={}",
999 announcement.oracle_public_key,
1000 announcement.oracle_event.event_id,
1001 e.to_string()
1002 );
1003 return None;
1004 }
1005 };
1006 if let Err(e) = attestation.validate(&self.secp, announcement) {
1008 log_error!(self.logger,
1009 "Oracle attestation is not valid. pubkey={} event_id={}, error={}",
1010 announcement.oracle_public_key,
1011 announcement.oracle_event.event_id,
1012 e.to_string()
1013 );
1014 return None;
1015 }
1016 log_info!(self.logger,
1017 "Retrieved a valid attestation. pubkey={} event_id={} outcomes={:?}",
1018 announcement.oracle_public_key,
1019 announcement.oracle_event.event_id,
1020 attestation.outcomes
1021 );
1022 Some((*i, attestation))
1023 })
1024 .collect::<FuturesUnordered<_>>()
1025 .await
1026 .filter_map(|result| async move { result }) .collect::<Vec<_>>()
1028 .await;
1029 if attestations.len() >= contract_info.threshold {
1030 log_info!(self.logger,
1031 "Found enough attestations to close contract. contract_id={} attestations={}",
1032 contract.accepted_contract.get_contract_id_string(),
1033 attestations.len()
1034 );
1035 return Some((contract_info, adaptor_info, attestations));
1036 }
1037 }
1038 }
1039 None
1040 }
1041
1042 #[tracing::instrument(skip_all, level = "debug")]
1043 async fn check_confirmed_contract(&self, contract: &SignedContract) -> Result<(), Error> {
1044 log_debug!(
1045 self.logger,
1046 "Checking confirmed contract. contract_id={}",
1047 contract.accepted_contract.get_contract_id_string()
1048 );
1049 let closable_contract_info = self.get_closable_contract_info(contract).await;
1050 if let Some((contract_info, adaptor_info, attestations)) = closable_contract_info {
1051 log_debug!(
1052 self.logger,
1053 "Found closable contract info. contract_id={} attestations={}",
1054 contract.accepted_contract.get_contract_id_string(),
1055 attestations.len()
1056 );
1057 let offer = &contract.accepted_contract.offered_contract;
1058 let signer = self.signer_provider.derive_contract_signer(offer.keys_id)?;
1059
1060 if let Ok(cet) = crate::contract_updater::get_signed_cet(
1066 &self.secp,
1067 contract,
1068 contract_info,
1069 adaptor_info,
1070 &attestations,
1071 &signer,
1072 &self.logger,
1073 ) {
1074 log_info!(
1075 self.logger,
1076 "Found valid CET. Closing contract. contract_id={}",
1077 contract.accepted_contract.get_contract_id_string()
1078 );
1079 match self
1080 .close_contract(
1081 contract,
1082 cet,
1083 attestations.iter().map(|x| x.1.clone()).collect(),
1084 )
1085 .await
1086 {
1087 Ok(closed_contract) => {
1088 log_info!(
1089 self.logger,
1090 "Updated contract to closed. contract_id={}",
1091 contract.accepted_contract.get_contract_id_string()
1092 );
1093 self.store.update_contract(&closed_contract).await?;
1094 return Ok(());
1095 }
1096 Err(e) => {
1097 log_warn!(
1098 self.logger,
1099 "Failed to close contract. contract_id={} error={}",
1100 contract.accepted_contract.get_contract_id_string(),
1101 e.to_string()
1102 );
1103 return Err(e);
1104 }
1105 }
1106 }
1107 }
1108
1109 for pending_close_tx in &contract
1111 .accepted_contract
1112 .dlc_transactions
1113 .pending_close_txs
1114 {
1115 let confirmations = self
1116 .blockchain
1117 .get_transaction_confirmations(&pending_close_tx.compute_txid())
1118 .await?;
1119
1120 log_debug!(
1121 self.logger,
1122 "Checking pending close transaction. close_txid={} contract_id={} confirmations={}",
1123 pending_close_tx.compute_txid().to_string(),
1124 contract.accepted_contract.get_contract_id_string(),
1125 confirmations
1126 );
1127
1128 if confirmations >= *NB_CONFIRMATIONS {
1129 log_info!(self.logger,
1131 "Pending close transaction is fully confirmed. Moving to closed. close_txid={} contract_id={}",
1132 pending_close_tx.compute_txid().to_string(),
1133 contract.accepted_contract.get_contract_id_string()
1134 );
1135 let pnl = contract.accepted_contract.compute_pnl(pending_close_tx);
1136 let closed_contract = ClosedContract {
1137 attestations: None, signed_cet: Some(pending_close_tx.clone()), contract_id: contract.accepted_contract.get_contract_id(),
1140 temporary_contract_id: contract.accepted_contract.offered_contract.id,
1141 counter_party_id: contract.accepted_contract.offered_contract.counter_party,
1142 pnl,
1143 funding_txid: contract
1144 .accepted_contract
1145 .dlc_transactions
1146 .fund
1147 .compute_txid(),
1148 signed_contract: contract.clone(),
1149 };
1150
1151 self.store
1152 .update_contract(&Contract::Closed(closed_contract))
1153 .await?;
1154 break; } else if confirmations >= 1 {
1156 log_debug!(self.logger,
1158 "Found a confirmed but not fully confirmed pending close. Moving to preclosed. close_txid={} contract_id={}",
1159 pending_close_tx.compute_txid().to_string(),
1160 contract.accepted_contract.get_contract_id_string()
1161 );
1162 let preclosed_contract = PreClosedContract {
1163 signed_contract: contract.clone(),
1164 attestations: None, signed_cet: pending_close_tx.clone(),
1166 };
1167
1168 self.store
1169 .update_contract(&Contract::PreClosed(preclosed_contract))
1170 .await?;
1171 break; }
1173 }
1174
1175 self.check_refund(contract).await?;
1176
1177 Ok(())
1178 }
1179
1180 #[tracing::instrument(skip_all, level = "debug")]
1182 pub async fn close_confirmed_contract(
1183 &self,
1184 contract_id: &ContractId,
1185 attestations: Vec<(usize, OracleAttestation)>,
1186 ) -> Result<Contract, Error> {
1187 log_info!(
1188 self.logger,
1189 "Attempting to close confirmed contract manually. contract_id={} outcomes={:?}",
1190 contract_id.to_lower_hex_string(),
1191 attestations
1192 .iter()
1193 .map(|(_, a)| a.outcomes.clone())
1194 .collect::<Vec<_>>()
1195 );
1196 let contract = get_contract_in_state!(self, contract_id, Confirmed, None::<PublicKey>)?;
1197 let contract_infos = &contract.accepted_contract.offered_contract.contract_info;
1198 let adaptor_infos = &contract.accepted_contract.adaptor_infos;
1199
1200 if let Some((contract_info, adaptor_info)) =
1202 contract_infos.iter().zip(adaptor_infos).find(|(c, _)| {
1203 let matches = attestations
1204 .iter()
1205 .filter(|(i, a)| {
1206 c.oracle_announcements[*i].oracle_event.oracle_nonces == a.nonces()
1207 })
1208 .count();
1209
1210 matches >= c.threshold
1211 })
1212 {
1213 let offer = &contract.accepted_contract.offered_contract;
1214 let signer = self.signer_provider.derive_contract_signer(offer.keys_id)?;
1215 log_debug!(
1216 self.logger,
1217 "Getting signed CET. contract_id={}",
1218 contract.accepted_contract.get_contract_id_string()
1219 );
1220 let cet = crate::contract_updater::get_signed_cet(
1221 &self.secp,
1222 &contract,
1223 contract_info,
1224 adaptor_info,
1225 &attestations,
1226 &signer,
1227 &self.logger,
1228 )?;
1229
1230 let time = bitcoin::absolute::Time::from_consensus(self.time.unix_time_now() as u32)
1232 .expect("Time is not in valid range. This should never happen.");
1233 let height =
1234 Height::from_consensus(self.blockchain.get_blockchain_height().await? as u32)
1235 .expect("Height is not in valid range. This should never happen.");
1236 let locktime = cet.lock_time;
1237
1238 if !locktime.is_satisfied_by(height, time) {
1239 return Err(Error::InvalidState(
1240 "CET lock time has not passed yet".to_string(),
1241 ));
1242 }
1243
1244 match self
1245 .close_contract(
1246 &contract,
1247 cet,
1248 attestations.into_iter().map(|x| x.1).collect(),
1249 )
1250 .await
1251 {
1252 Ok(closed_contract) => {
1253 log_info!(
1254 self.logger,
1255 "Closed contract manually. contract_id={}",
1256 contract.accepted_contract.get_contract_id_string()
1257 );
1258 self.store.update_contract(&closed_contract).await?;
1259 Ok(closed_contract)
1260 }
1261 Err(e) => {
1262 log_error!(
1263 self.logger,
1264 "Failed to close contract. contract_id={} error={}",
1265 contract.accepted_contract.get_contract_id_string(),
1266 e.to_string()
1267 );
1268 Err(e)
1269 }
1270 }
1271 } else {
1272 Err(Error::InvalidState(
1273 "Attestations did not match contract infos".to_string(),
1274 ))
1275 }
1276 }
1277
1278 #[tracing::instrument(skip_all, level = "debug")]
1279 async fn check_preclosed_contracts(&self) -> Result<(), Error> {
1280 for c in self.store.get_preclosed_contracts().await? {
1281 if let Err(e) = self.check_preclosed_contract(&c).await {
1282 log_error!(
1283 self.logger,
1284 "Error checking pre-closed contract. contract_id={} error={}",
1285 c.signed_contract.accepted_contract.get_contract_id_string(),
1286 e
1287 )
1288 }
1289 }
1290
1291 Ok(())
1292 }
1293
1294 #[tracing::instrument(skip_all, level = "debug")]
1295 async fn check_preclosed_contract(&self, contract: &PreClosedContract) -> Result<(), Error> {
1296 let broadcasted_txid = contract.signed_cet.compute_txid();
1297 let confirmations = self
1298 .blockchain
1299 .get_transaction_confirmations(&broadcasted_txid)
1300 .await?;
1301 log_debug!(
1302 self.logger,
1303 "Checking pre-closed contract. broadcasted_txid={} contract_id={} confirmations={}",
1304 broadcasted_txid.to_string(),
1305 contract
1306 .signed_contract
1307 .accepted_contract
1308 .get_contract_id_string(),
1309 confirmations
1310 );
1311 if confirmations >= *NB_CONFIRMATIONS {
1312 log_debug!(self.logger,
1313 "Pre-closed contract is fully confirmed. Moving to closed. broadcasted_txid={} contract_id={}",
1314 broadcasted_txid.to_string(),
1315 contract.signed_contract.accepted_contract.get_contract_id_string()
1316 );
1317 let pnl = contract
1318 .signed_contract
1319 .accepted_contract
1320 .compute_pnl(&contract.signed_cet);
1321
1322 let closed_contract = ClosedContract {
1323 attestations: contract.attestations.clone(),
1324 signed_cet: Some(contract.signed_cet.clone()),
1325 contract_id: contract.signed_contract.accepted_contract.get_contract_id(),
1326 temporary_contract_id: contract
1327 .signed_contract
1328 .accepted_contract
1329 .offered_contract
1330 .id,
1331 counter_party_id: contract
1332 .signed_contract
1333 .accepted_contract
1334 .offered_contract
1335 .counter_party,
1336 funding_txid: contract
1337 .signed_contract
1338 .accepted_contract
1339 .dlc_transactions
1340 .fund
1341 .compute_txid(),
1342 pnl,
1343 signed_contract: contract.signed_contract.clone(),
1344 };
1345 self.store
1346 .update_contract(&Contract::Closed(closed_contract))
1347 .await?;
1348 }
1349
1350 Ok(())
1351 }
1352
1353 #[tracing::instrument(skip_all, level = "debug")]
1354 async fn close_contract(
1355 &self,
1356 contract: &SignedContract,
1357 signed_cet: Transaction,
1358 attestations: Vec<OracleAttestation>,
1359 ) -> Result<Contract, Error> {
1360 let confirmations = self
1361 .blockchain
1362 .get_transaction_confirmations(&signed_cet.compute_txid())
1363 .await?;
1364
1365 if confirmations < 1 {
1366 log_info!(
1367 self.logger,
1368 "Broadcasting signed CET. txid={} contract_id={}",
1369 signed_cet.compute_txid().to_string(),
1370 contract.accepted_contract.get_contract_id_string()
1371 );
1372 self.blockchain.send_transaction(&signed_cet).await?;
1377
1378 let preclosed_contract = PreClosedContract {
1379 signed_contract: contract.clone(),
1380 attestations: Some(attestations),
1381 signed_cet,
1382 };
1383
1384 return Ok(Contract::PreClosed(preclosed_contract));
1385 } else if confirmations < *NB_CONFIRMATIONS {
1386 log_debug!(self.logger,
1387 "Found a confirmed but not fully confirmed pending close. Moving to preclosed. txid={} contract_id={}",
1388 signed_cet.compute_txid().to_string(),
1389 contract.accepted_contract.get_contract_id_string()
1390 );
1391 let preclosed_contract = PreClosedContract {
1392 signed_contract: contract.clone(),
1393 attestations: Some(attestations),
1394 signed_cet,
1395 };
1396
1397 return Ok(Contract::PreClosed(preclosed_contract));
1398 }
1399
1400 let closed_contract = ClosedContract {
1401 attestations: Some(attestations.to_vec()),
1402 pnl: contract.accepted_contract.compute_pnl(&signed_cet),
1403 signed_cet: Some(signed_cet),
1404 contract_id: contract.accepted_contract.get_contract_id(),
1405 temporary_contract_id: contract.accepted_contract.offered_contract.id,
1406 funding_txid: contract
1407 .accepted_contract
1408 .dlc_transactions
1409 .fund
1410 .compute_txid(),
1411 counter_party_id: contract.accepted_contract.offered_contract.counter_party,
1412 signed_contract: contract.clone(),
1413 };
1414
1415 Ok(Contract::Closed(closed_contract))
1416 }
1417
1418 #[tracing::instrument(skip_all, level = "debug")]
1420 async fn check_refund(&self, contract: &SignedContract) -> Result<(), Error> {
1421 if contract
1423 .accepted_contract
1424 .dlc_transactions
1425 .refund
1426 .lock_time
1427 .to_consensus_u32() as u64
1428 <= self.time.unix_time_now()
1429 {
1430 log_debug!(self.logger,
1431 "Refund locktime has passed. Attempting to broadcast refund. refund_txid={} contract_id={}",
1432 contract.accepted_contract.dlc_transactions.refund.compute_txid().to_string(),
1433 contract.accepted_contract.get_contract_id_string()
1434 );
1435 let accepted_contract = &contract.accepted_contract;
1436 let refund = accepted_contract.dlc_transactions.refund.clone();
1437 let confirmations = self
1438 .blockchain
1439 .get_transaction_confirmations(&refund.compute_txid())
1440 .await?;
1441 if confirmations == 0 {
1442 log_debug!(self.logger,
1443 "Refund transaction has not been broadcast yet. Sending transaction txid={} contract_id={}",
1444 refund.compute_txid().to_string(),
1445 contract.accepted_contract.get_contract_id_string()
1446 );
1447 let offer = &contract.accepted_contract.offered_contract;
1448 let signer = self.signer_provider.derive_contract_signer(offer.keys_id)?;
1449 let refund = crate::contract_updater::get_signed_refund(
1450 &self.secp,
1451 contract,
1452 &signer,
1453 &self.logger,
1454 )?;
1455 self.blockchain.send_transaction(&refund).await?;
1456 }
1457
1458 self.store
1459 .update_contract(&Contract::Refunded(contract.clone()))
1460 .await?;
1461 }
1462
1463 Ok(())
1464 }
1465
1466 #[tracing::instrument(skip_all, level = "debug")]
1469 pub async fn on_counterparty_close(
1470 &mut self,
1471 contract: &SignedContract,
1472 closing_tx: Transaction,
1473 confirmations: u32,
1474 ) -> Result<Contract, Error> {
1475 if !closing_tx.input.iter().any(|i| {
1477 i.previous_output
1478 == contract
1479 .accepted_contract
1480 .dlc_transactions
1481 .get_fund_outpoint()
1482 }) {
1483 log_error!(
1484 self.logger,
1485 "Closing tx does not spend the funding tx. txid={} contract_id={}",
1486 closing_tx.compute_txid().to_string(),
1487 contract.accepted_contract.get_contract_id_string()
1488 );
1489 return Err(Error::InvalidParameters(
1490 "Closing tx does not spend the funding tx".to_string(),
1491 ));
1492 }
1493
1494 if contract
1496 .accepted_contract
1497 .dlc_transactions
1498 .refund
1499 .compute_txid()
1500 == closing_tx.compute_txid()
1501 {
1502 log_debug!(
1503 self.logger,
1504 "Closing tx is the refund tx. Moving to refunded. txid={} contract_id={}",
1505 closing_tx.compute_txid().to_string(),
1506 contract.accepted_contract.get_contract_id_string()
1507 );
1508 let refunded = Contract::Refunded(contract.clone());
1509 self.store.update_contract(&refunded).await?;
1510 return Ok(refunded);
1511 }
1512
1513 let contract = if confirmations < *NB_CONFIRMATIONS {
1514 log_info!(self.logger,
1515 "Closing transaction is not fully confirmed. Moving to preclosed. txid={} contract_id={}",
1516 closing_tx.compute_txid().to_string(),
1517 contract.accepted_contract.get_contract_id_string()
1518 );
1519 Contract::PreClosed(PreClosedContract {
1520 signed_contract: contract.clone(),
1521 attestations: None, signed_cet: closing_tx,
1523 })
1524 } else {
1525 log_info!(
1526 self.logger,
1527 "Closing transaction is fully confirmed. Moving to closed. txid={} contract_id={}",
1528 closing_tx.compute_txid().to_string(),
1529 contract.accepted_contract.get_contract_id_string()
1530 );
1531 Contract::Closed(ClosedContract {
1532 attestations: None, pnl: contract.accepted_contract.compute_pnl(&closing_tx),
1534 signed_cet: Some(closing_tx),
1535 contract_id: contract.accepted_contract.get_contract_id(),
1536 temporary_contract_id: contract.accepted_contract.offered_contract.id,
1537 counter_party_id: contract.accepted_contract.offered_contract.counter_party,
1538 funding_txid: contract
1539 .accepted_contract
1540 .dlc_transactions
1541 .fund
1542 .compute_txid(),
1543 signed_contract: contract.clone(),
1544 })
1545 };
1546
1547 self.store.update_contract(&contract).await?;
1548
1549 Ok(contract)
1550 }
1551
1552 #[tracing::instrument(skip_all, level = "debug")]
1556 pub async fn cooperative_close_contract(
1557 &self,
1558 contract_id: &ContractId,
1559 counter_payout: Amount,
1560 ) -> Result<(CloseDlc, PublicKey), Error> {
1561 let signed_contract =
1562 get_contract_in_state!(self, contract_id, Confirmed, None as Option<PublicKey>)?;
1563
1564 let (close_message, close_tx) = crate::contract_updater::create_cooperative_close(
1565 &self.secp,
1566 &signed_contract,
1567 counter_payout,
1568 &self.signer_provider,
1569 &self.logger,
1570 )?;
1571
1572 let mut updated_dlc_transactions =
1574 signed_contract.accepted_contract.dlc_transactions.clone();
1575 updated_dlc_transactions
1576 .pending_close_txs
1577 .push(close_tx.clone());
1578 log_debug!(
1579 self.logger,
1580 "Created updated contract with pending close transaction. close_txid={} contract_id={}",
1581 close_tx.compute_txid().to_string(),
1582 contract_id.to_lower_hex_string()
1583 );
1584
1585 let updated_accepted_contract = AcceptedContract {
1586 dlc_transactions: updated_dlc_transactions,
1587 ..signed_contract.accepted_contract.clone()
1588 };
1589
1590 let updated_signed_contract = SignedContract {
1591 accepted_contract: updated_accepted_contract,
1592 ..signed_contract.clone()
1593 };
1594
1595 self.store
1597 .update_contract(&Contract::Confirmed(updated_signed_contract))
1598 .await?;
1599
1600 let counter_party = signed_contract
1601 .accepted_contract
1602 .offered_contract
1603 .counter_party;
1604
1605 Ok((close_message, counter_party))
1606 }
1607
1608 #[tracing::instrument(skip_all, level = "debug")]
1611 pub async fn accept_cooperative_close(
1612 &self,
1613 contract_id: &ContractId,
1614 close_message: &CloseDlc,
1615 ) -> Result<(), Error> {
1616 let signed_contract =
1617 get_contract_in_state!(self, contract_id, Confirmed, None as Option<PublicKey>)?;
1618
1619 let close_tx = crate::contract_updater::complete_cooperative_close(
1620 &self.secp,
1621 &signed_contract,
1622 close_message,
1623 &self.signer_provider,
1624 &self.logger,
1625 )?;
1626
1627 log_debug!(
1628 self.logger,
1629 "Completed cooperative close. close_txid={} contract_id={}",
1630 close_tx.compute_txid().to_string(),
1631 contract_id.to_lower_hex_string()
1632 );
1633 self.blockchain.send_transaction(&close_tx).await?;
1635
1636 let preclosed_contract = PreClosedContract {
1638 signed_contract,
1639 attestations: None,
1640 signed_cet: close_tx,
1641 };
1642
1643 self.store
1644 .update_contract(&Contract::PreClosed(preclosed_contract))
1645 .await?;
1646
1647 Ok(())
1648 }
1649}
1650
1651impl<
1652 W: Deref,
1653 SP: Deref,
1654 B: Deref,
1655 S: Deref,
1656 O: Deref,
1657 T: Deref,
1658 F: Deref,
1659 X: ContractSigner,
1660 L: Deref,
1661 > Manager<W, Arc<CachedContractSignerProvider<SP, X>>, B, S, O, T, F, X, L>
1662where
1663 W::Target: Wallet,
1664 SP::Target: ContractSignerProvider<Signer = X>,
1665 B::Target: Blockchain,
1666 S::Target: Storage,
1667 O::Target: Oracle,
1668 T::Target: Time,
1669 F::Target: FeeEstimator,
1670 L::Target: Logger,
1671{
1672 pub async fn offer_channel(
1675 &self,
1676 contract_input: &ContractInput,
1677 counter_party: PublicKey,
1678 ) -> Result<OfferChannel, Error> {
1679 let oracle_announcements = self.oracle_announcements(contract_input).await?;
1680
1681 let (offered_channel, offered_contract) = crate::channel_updater::offer_channel(
1682 &self.secp,
1683 contract_input,
1684 &counter_party,
1685 &oracle_announcements,
1686 CET_NSEQUENCE,
1687 REFUND_DELAY,
1688 &self.wallet,
1689 &self.signer_provider,
1690 &self.blockchain,
1691 &self.time,
1692 crate::utils::get_new_temporary_id(),
1693 )
1694 .await?;
1695
1696 let msg = offered_channel.get_offer_channel_msg(&offered_contract);
1697
1698 self.store
1699 .upsert_channel(
1700 Channel::Offered(offered_channel),
1701 Some(Contract::Offered(offered_contract)),
1702 )
1703 .await?;
1704
1705 Ok(msg)
1706 }
1707
1708 pub async fn reject_channel(
1711 &self,
1712 channel_id: &ChannelId,
1713 ) -> Result<(Reject, PublicKey), Error> {
1714 let offered_channel =
1715 get_channel_in_state!(self, channel_id, Offered, None as Option<PublicKey>)?;
1716
1717 if offered_channel.is_offer_party {
1718 return Err(Error::InvalidState(
1719 "Cannot reject channel initiated by us.".to_string(),
1720 ));
1721 }
1722
1723 let offered_contract = get_contract_in_state!(
1724 self,
1725 &offered_channel.offered_contract_id,
1726 Offered,
1727 None as Option<PublicKey>
1728 )?;
1729
1730 let counterparty = offered_channel.counter_party;
1731 self.store
1732 .upsert_channel(
1733 Channel::Cancelled(offered_channel),
1734 Some(Contract::Rejected(offered_contract)),
1735 )
1736 .await?;
1737
1738 let msg = Reject {
1739 channel_id: *channel_id,
1740 };
1741 Ok((msg, counterparty))
1742 }
1743
1744 pub async fn accept_channel(
1748 &self,
1749 channel_id: &ChannelId,
1750 ) -> Result<(AcceptChannel, ChannelId, ContractId, PublicKey), Error> {
1751 let offered_channel =
1752 get_channel_in_state!(self, channel_id, Offered, None as Option<PublicKey>)?;
1753
1754 if offered_channel.is_offer_party {
1755 return Err(Error::InvalidState(
1756 "Cannot accept channel initiated by us.".to_string(),
1757 ));
1758 }
1759
1760 let offered_contract = get_contract_in_state!(
1761 self,
1762 &offered_channel.offered_contract_id,
1763 Offered,
1764 None as Option<PublicKey>
1765 )?;
1766
1767 let (accepted_channel, accepted_contract, accept_channel) =
1768 crate::channel_updater::accept_channel_offer(
1769 &self.secp,
1770 &offered_channel,
1771 &offered_contract,
1772 &self.wallet,
1773 &self.signer_provider,
1774 &self.blockchain,
1775 )
1776 .await?;
1777
1778 self.wallet.import_address(&Address::p2wsh(
1779 &accepted_contract.dlc_transactions.funding_script_pubkey,
1780 self.blockchain.get_network()?,
1781 ))?;
1782
1783 let channel_id = accepted_channel.channel_id;
1784 let contract_id = accepted_contract.get_contract_id();
1785 let counter_party = accepted_contract.offered_contract.counter_party;
1786
1787 self.store
1788 .upsert_channel(
1789 Channel::Accepted(accepted_channel),
1790 Some(Contract::Accepted(accepted_contract)),
1791 )
1792 .await?;
1793
1794 Ok((accept_channel, channel_id, contract_id, counter_party))
1795 }
1796
1797 pub async fn force_close_channel(&self, channel_id: &ChannelId) -> Result<(), Error> {
1799 let channel = get_channel_in_state!(self, channel_id, Signed, None as Option<PublicKey>)?;
1800
1801 self.force_close_channel_internal(channel, true).await
1802 }
1803
1804 pub async fn settle_offer(
1808 &self,
1809 channel_id: &ChannelId,
1810 counter_payout: Amount,
1811 ) -> Result<(SettleOffer, PublicKey), Error> {
1812 let mut signed_channel =
1813 get_channel_in_state!(self, channel_id, Signed, None as Option<PublicKey>)?;
1814
1815 let msg = crate::channel_updater::settle_channel_offer(
1816 &self.secp,
1817 &mut signed_channel,
1818 counter_payout,
1819 PEER_TIMEOUT,
1820 &self.signer_provider,
1821 &self.time,
1822 )?;
1823
1824 let counter_party = signed_channel.counter_party;
1825
1826 self.store
1827 .upsert_channel(Channel::Signed(signed_channel), None)
1828 .await?;
1829
1830 Ok((msg, counter_party))
1831 }
1832
1833 pub async fn accept_settle_offer(
1836 &self,
1837 channel_id: &ChannelId,
1838 ) -> Result<(SettleAccept, PublicKey), Error> {
1839 let mut signed_channel =
1840 get_channel_in_state!(self, channel_id, Signed, None as Option<PublicKey>)?;
1841
1842 let msg = crate::channel_updater::settle_channel_accept(
1843 &self.secp,
1844 &mut signed_channel,
1845 CET_NSEQUENCE,
1846 0,
1847 PEER_TIMEOUT,
1848 &self.signer_provider,
1849 &self.time,
1850 &self.chain_monitor,
1851 )
1852 .await?;
1853
1854 let counter_party = signed_channel.counter_party;
1855
1856 self.store
1857 .upsert_channel(Channel::Signed(signed_channel), None)
1858 .await?;
1859
1860 Ok((msg, counter_party))
1861 }
1862
1863 pub async fn renew_offer(
1867 &self,
1868 channel_id: &ChannelId,
1869 counter_payout: Amount,
1870 contract_input: &ContractInput,
1871 ) -> Result<(RenewOffer, PublicKey), Error> {
1872 let mut signed_channel =
1873 get_channel_in_state!(self, channel_id, Signed, None as Option<PublicKey>)?;
1874
1875 let oracle_announcements = self.oracle_announcements(contract_input).await?;
1876
1877 let (msg, offered_contract) = crate::channel_updater::renew_offer(
1878 &self.secp,
1879 &mut signed_channel,
1880 contract_input,
1881 oracle_announcements,
1882 counter_payout,
1883 REFUND_DELAY,
1884 PEER_TIMEOUT,
1885 CET_NSEQUENCE,
1886 &self.signer_provider,
1887 &self.time,
1888 )?;
1889
1890 let counter_party = offered_contract.counter_party;
1891
1892 self.store
1893 .upsert_channel(
1894 Channel::Signed(signed_channel),
1895 Some(Contract::Offered(offered_contract)),
1896 )
1897 .await?;
1898
1899 Ok((msg, counter_party))
1900 }
1901
1902 pub async fn accept_renew_offer(
1906 &self,
1907 channel_id: &ChannelId,
1908 ) -> Result<(RenewAccept, PublicKey), Error> {
1909 let mut signed_channel =
1910 get_channel_in_state!(self, channel_id, Signed, None as Option<PublicKey>)?;
1911 let offered_contract_id = signed_channel.get_contract_id().ok_or_else(|| {
1912 Error::InvalidState("Expected to have a contract id but did not.".to_string())
1913 })?;
1914
1915 let offered_contract = get_contract_in_state!(
1916 self,
1917 &offered_contract_id,
1918 Offered,
1919 None as Option<PublicKey>
1920 )?;
1921
1922 let (accepted_contract, msg) = crate::channel_updater::accept_channel_renewal(
1923 &self.secp,
1924 &mut signed_channel,
1925 &offered_contract,
1926 CET_NSEQUENCE,
1927 PEER_TIMEOUT,
1928 &self.signer_provider,
1929 &self.time,
1930 )?;
1931
1932 let counter_party = signed_channel.counter_party;
1933
1934 self.store
1935 .upsert_channel(
1936 Channel::Signed(signed_channel),
1937 Some(Contract::Accepted(accepted_contract)),
1938 )
1939 .await?;
1940
1941 Ok((msg, counter_party))
1942 }
1943
1944 pub async fn reject_renew_offer(
1948 &self,
1949 channel_id: &ChannelId,
1950 ) -> Result<(Reject, PublicKey), Error> {
1951 let mut signed_channel =
1952 get_channel_in_state!(self, channel_id, Signed, None as Option<PublicKey>)?;
1953 let offered_contract_id = signed_channel.get_contract_id().ok_or_else(|| {
1954 Error::InvalidState(
1955 "Expected to be in a state with an associated contract id but was not.".to_string(),
1956 )
1957 })?;
1958
1959 let offered_contract = get_contract_in_state!(
1960 self,
1961 &offered_contract_id,
1962 Offered,
1963 None as Option<PublicKey>
1964 )?;
1965
1966 let reject_msg = crate::channel_updater::reject_renew_offer(&mut signed_channel)?;
1967
1968 let counter_party = signed_channel.counter_party;
1969
1970 self.store
1971 .upsert_channel(
1972 Channel::Signed(signed_channel),
1973 Some(Contract::Rejected(offered_contract)),
1974 )
1975 .await?;
1976
1977 Ok((reject_msg, counter_party))
1978 }
1979
1980 pub async fn reject_settle_offer(
1984 &self,
1985 channel_id: &ChannelId,
1986 ) -> Result<(Reject, PublicKey), Error> {
1987 let mut signed_channel =
1988 get_channel_in_state!(self, channel_id, Signed, None as Option<PublicKey>)?;
1989
1990 let msg = crate::channel_updater::reject_settle_offer(&mut signed_channel)?;
1991
1992 let counter_party = signed_channel.counter_party;
1993
1994 self.store
1995 .upsert_channel(Channel::Signed(signed_channel), None)
1996 .await?;
1997
1998 Ok((msg, counter_party))
1999 }
2000
2001 pub async fn offer_collaborative_close(
2006 &self,
2007 channel_id: &ChannelId,
2008 counter_payout: Amount,
2009 additional_inputs: Vec<OutPoint>,
2010 ) -> Result<CollaborativeCloseOffer, Error> {
2011 let mut signed_channel =
2012 get_channel_in_state!(self, channel_id, Signed, None as Option<PublicKey>)?;
2013
2014 let (msg, close_tx) = crate::channel_updater::offer_collaborative_close(
2015 &self.secp,
2016 &mut signed_channel,
2017 counter_payout,
2018 additional_inputs,
2019 &self.signer_provider,
2020 &self.time,
2021 )?;
2022
2023 self.chain_monitor.lock().await.add_tx(
2024 close_tx.compute_txid(),
2025 ChannelInfo {
2026 channel_id: *channel_id,
2027 tx_type: TxType::CollaborativeClose,
2028 },
2029 );
2030
2031 self.store
2032 .upsert_channel(Channel::Signed(signed_channel), None)
2033 .await?;
2034 self.store
2035 .persist_chain_monitor(&*self.chain_monitor.lock().await)
2036 .await?;
2037
2038 Ok(msg)
2039 }
2040
2041 pub async fn accept_collaborative_close(&self, channel_id: &ChannelId) -> Result<(), Error> {
2044 let signed_channel =
2045 get_channel_in_state!(self, channel_id, Signed, None as Option<PublicKey>)?;
2046
2047 let closed_contract = if let Some(SignedChannelState::Established {
2048 signed_contract_id,
2049 ..
2050 }) = &signed_channel.roll_back_state
2051 {
2052 let counter_payout = get_signed_channel_state!(
2053 signed_channel,
2054 CollaborativeCloseOffered,
2055 counter_payout
2056 )?;
2057 Some(
2058 self.get_collaboratively_closed_contract(signed_contract_id, *counter_payout, true)
2059 .await?,
2060 )
2061 } else {
2062 None
2063 };
2064
2065 let (close_tx, closed_channel) = crate::channel_updater::accept_collaborative_close_offer(
2066 &self.secp,
2067 &signed_channel,
2068 &self.signer_provider,
2069 )?;
2070
2071 self.blockchain.send_transaction(&close_tx).await?;
2072
2073 self.store.upsert_channel(closed_channel, None).await?;
2074
2075 if let Some(closed_contract) = closed_contract {
2076 self.store
2077 .update_contract(&Contract::Closed(closed_contract))
2078 .await?;
2079 }
2080
2081 Ok(())
2082 }
2083
2084 async fn try_finalize_closing_established_channel(
2085 &self,
2086 signed_channel: SignedChannel,
2087 ) -> Result<(), Error> {
2088 let (buffer_tx, contract_id, &is_initiator) = get_signed_channel_state!(
2089 signed_channel,
2090 Closing,
2091 buffer_transaction,
2092 contract_id,
2093 is_initiator
2094 )?;
2095
2096 if self
2097 .blockchain
2098 .get_transaction_confirmations(&buffer_tx.compute_txid())
2099 .await?
2100 >= CET_NSEQUENCE
2101 {
2102 log_info!(
2103 self.logger,
2104 "Buffer transaction for contract {} has enough confirmations to spend from it",
2105 serialize_hex(&contract_id)
2106 );
2107
2108 let confirmed_contract =
2109 get_contract_in_state!(self, &contract_id, Confirmed, None as Option<PublicKey>)?;
2110
2111 let (contract_info, adaptor_info, attestations) = self
2112 .get_closable_contract_info(&confirmed_contract)
2113 .await
2114 .ok_or_else(|| {
2115 Error::InvalidState("Could not get information to close contract".to_string())
2116 })?;
2117
2118 let (signed_cet, closed_channel) =
2119 crate::channel_updater::finalize_unilateral_close_settled_channel(
2120 &self.secp,
2121 &signed_channel,
2122 &confirmed_contract,
2123 contract_info,
2124 &attestations,
2125 adaptor_info,
2126 &self.signer_provider,
2127 is_initiator,
2128 )?;
2129
2130 let closed_contract = self
2131 .close_contract(
2132 &confirmed_contract,
2133 signed_cet,
2134 attestations.iter().map(|x| &x.1).cloned().collect(),
2135 )
2136 .await?;
2137
2138 self.chain_monitor
2139 .lock()
2140 .await
2141 .cleanup_channel(signed_channel.channel_id);
2142
2143 self.store
2144 .upsert_channel(closed_channel, Some(closed_contract))
2145 .await?;
2146 }
2147
2148 Ok(())
2149 }
2150
2151 async fn on_offer_channel(
2152 &self,
2153 offer_channel: &OfferChannel,
2154 counter_party: PublicKey,
2155 ) -> Result<(), Error> {
2156 offer_channel.validate(
2157 &self.secp,
2158 REFUND_DELAY,
2159 REFUND_DELAY * 2,
2160 CET_NSEQUENCE,
2161 CET_NSEQUENCE * 2,
2162 )?;
2163
2164 let keys_id = self
2165 .signer_provider
2166 .derive_signer_key_id(false, offer_channel.temporary_contract_id);
2167 let (channel, contract) =
2168 OfferedChannel::from_offer_channel(offer_channel, counter_party, keys_id)?;
2169
2170 contract.validate()?;
2171
2172 if self
2173 .store
2174 .get_channel(&channel.temporary_channel_id)
2175 .await?
2176 .is_some()
2177 {
2178 return Err(Error::InvalidParameters(
2179 "Channel with identical idea already in store".to_string(),
2180 ));
2181 }
2182
2183 self.store
2184 .upsert_channel(Channel::Offered(channel), Some(Contract::Offered(contract)))
2185 .await?;
2186
2187 Ok(())
2188 }
2189
2190 async fn on_accept_channel(
2191 &self,
2192 accept_channel: &AcceptChannel,
2193 peer_id: &PublicKey,
2194 ) -> Result<SignChannel, Error> {
2195 let offered_channel = get_channel_in_state!(
2196 self,
2197 &accept_channel.temporary_channel_id,
2198 Offered,
2199 Some(*peer_id)
2200 )?;
2201 let offered_contract = get_contract_in_state!(
2202 self,
2203 &offered_channel.offered_contract_id,
2204 Offered,
2205 Some(*peer_id)
2206 )?;
2207
2208 let (signed_channel, signed_contract, sign_channel) = {
2209 let res = crate::channel_updater::verify_and_sign_accepted_channel(
2210 &self.secp,
2211 &offered_channel,
2212 &offered_contract,
2213 accept_channel,
2214 CET_NSEQUENCE,
2216 &self.wallet,
2217 &self.signer_provider,
2218 &self.store,
2219 &self.chain_monitor,
2220 &self.logger,
2221 )
2222 .await;
2223
2224 match res {
2225 Ok(res) => res,
2226 Err(e) => {
2227 let channel = crate::channel::FailedAccept {
2228 temporary_channel_id: accept_channel.temporary_channel_id,
2229 error_message: format!("Error validating accept channel: {e}"),
2230 accept_message: accept_channel.clone(),
2231 counter_party: *peer_id,
2232 };
2233 self.store
2234 .upsert_channel(Channel::FailedAccept(channel), None)
2235 .await?;
2236 return Err(e);
2237 }
2238 }
2239 };
2240
2241 self.wallet.import_address(&Address::p2wsh(
2242 &signed_contract
2243 .accepted_contract
2244 .dlc_transactions
2245 .funding_script_pubkey,
2246 self.blockchain.get_network()?,
2247 ))?;
2248
2249 if let SignedChannelState::Established {
2250 buffer_transaction, ..
2251 } = &signed_channel.state
2252 {
2253 self.chain_monitor.lock().await.add_tx(
2254 buffer_transaction.compute_txid(),
2255 ChannelInfo {
2256 channel_id: signed_channel.channel_id,
2257 tx_type: TxType::BufferTx,
2258 },
2259 );
2260 } else {
2261 unreachable!();
2262 }
2263
2264 self.store
2265 .upsert_channel(
2266 Channel::Signed(signed_channel),
2267 Some(Contract::Signed(signed_contract)),
2268 )
2269 .await?;
2270
2271 self.store
2272 .persist_chain_monitor(&*self.chain_monitor.lock().await)
2273 .await?;
2274
2275 Ok(sign_channel)
2276 }
2277
2278 async fn on_sign_channel(
2279 &self,
2280 sign_channel: &SignChannel,
2281 peer_id: &PublicKey,
2282 ) -> Result<(), Error> {
2283 let accepted_channel =
2284 get_channel_in_state!(self, &sign_channel.channel_id, Accepted, Some(*peer_id))?;
2285 let accepted_contract = get_contract_in_state!(
2286 self,
2287 &accepted_channel.accepted_contract_id,
2288 Accepted,
2289 Some(*peer_id)
2290 )?;
2291
2292 let (signed_channel, signed_contract, signed_fund_tx) = {
2293 let res = verify_signed_channel(
2294 &self.secp,
2295 &accepted_channel,
2296 &accepted_contract,
2297 sign_channel,
2298 &self.wallet,
2299 &self.chain_monitor,
2300 &self.store,
2301 &self.signer_provider,
2302 &self.logger,
2303 )
2304 .await;
2305
2306 match res {
2307 Ok(res) => res,
2308 Err(e) => {
2309 let channel = crate::channel::FailedSign {
2310 channel_id: sign_channel.channel_id,
2311 error_message: format!("Error validating accept channel: {e}"),
2312 sign_message: sign_channel.clone(),
2313 counter_party: *peer_id,
2314 };
2315 self.store
2316 .upsert_channel(Channel::FailedSign(channel), None)
2317 .await?;
2318 return Err(e);
2319 }
2320 }
2321 };
2322
2323 if let SignedChannelState::Established {
2324 buffer_transaction, ..
2325 } = &signed_channel.state
2326 {
2327 self.chain_monitor.lock().await.add_tx(
2328 buffer_transaction.compute_txid(),
2329 ChannelInfo {
2330 channel_id: signed_channel.channel_id,
2331 tx_type: TxType::BufferTx,
2332 },
2333 );
2334 } else {
2335 unreachable!();
2336 }
2337
2338 self.blockchain.send_transaction(&signed_fund_tx).await?;
2339
2340 self.store
2341 .upsert_channel(
2342 Channel::Signed(signed_channel),
2343 Some(Contract::Signed(signed_contract)),
2344 )
2345 .await?;
2346 self.store
2347 .persist_chain_monitor(&*self.chain_monitor.lock().await)
2348 .await?;
2349
2350 Ok(())
2351 }
2352
2353 async fn on_settle_offer(
2354 &self,
2355 settle_offer: &SettleOffer,
2356 peer_id: &PublicKey,
2357 ) -> Result<Option<Reject>, Error> {
2358 let mut signed_channel =
2359 get_channel_in_state!(self, &settle_offer.channel_id, Signed, Some(*peer_id))?;
2360
2361 if let SignedChannelState::SettledOffered { .. } = signed_channel.state {
2362 return Ok(Some(Reject {
2363 channel_id: settle_offer.channel_id,
2364 }));
2365 }
2366
2367 crate::channel_updater::on_settle_offer(&mut signed_channel, settle_offer)?;
2368
2369 self.store
2370 .upsert_channel(Channel::Signed(signed_channel), None)
2371 .await?;
2372
2373 Ok(None)
2374 }
2375
2376 async fn on_settle_accept(
2377 &self,
2378 settle_accept: &SettleAccept,
2379 peer_id: &PublicKey,
2380 ) -> Result<SettleConfirm, Error> {
2381 let mut signed_channel =
2382 get_channel_in_state!(self, &settle_accept.channel_id, Signed, Some(*peer_id))?;
2383
2384 let msg = crate::channel_updater::settle_channel_confirm(
2385 &self.secp,
2386 &mut signed_channel,
2387 settle_accept,
2388 CET_NSEQUENCE,
2389 0,
2390 PEER_TIMEOUT,
2391 &self.signer_provider,
2392 &self.time,
2393 &self.chain_monitor,
2394 )
2395 .await?;
2396
2397 self.store
2398 .upsert_channel(Channel::Signed(signed_channel), None)
2399 .await?;
2400
2401 Ok(msg)
2402 }
2403
2404 async fn on_settle_confirm(
2405 &self,
2406 settle_confirm: &SettleConfirm,
2407 peer_id: &PublicKey,
2408 ) -> Result<SettleFinalize, Error> {
2409 let mut signed_channel =
2410 get_channel_in_state!(self, &settle_confirm.channel_id, Signed, Some(*peer_id))?;
2411 let &own_payout = get_signed_channel_state!(signed_channel, SettledAccepted, own_payout)?;
2412 let (prev_buffer_tx, own_buffer_adaptor_signature, is_offer, signed_contract_id) = get_signed_channel_rollback_state!(
2413 signed_channel,
2414 Established,
2415 buffer_transaction,
2416 own_buffer_adaptor_signature,
2417 is_offer,
2418 signed_contract_id
2419 )?;
2420
2421 let prev_buffer_txid = prev_buffer_tx.compute_txid();
2422 let own_buffer_adaptor_signature = *own_buffer_adaptor_signature;
2423 let is_offer = *is_offer;
2424 let signed_contract_id = *signed_contract_id;
2425
2426 let msg = crate::channel_updater::settle_channel_finalize(
2427 &self.secp,
2428 &mut signed_channel,
2429 settle_confirm,
2430 &self.signer_provider,
2431 )?;
2432
2433 self.chain_monitor.lock().await.add_tx(
2434 prev_buffer_txid,
2435 ChannelInfo {
2436 channel_id: signed_channel.channel_id,
2437 tx_type: TxType::Revoked {
2438 update_idx: signed_channel.update_idx + 1,
2439 own_adaptor_signature: own_buffer_adaptor_signature,
2440 is_offer,
2441 revoked_tx_type: RevokedTxType::Buffer,
2442 },
2443 },
2444 );
2445
2446 let closed_contract = Contract::Closed(
2447 self.get_collaboratively_closed_contract(&signed_contract_id, own_payout, true)
2448 .await?,
2449 );
2450
2451 self.store
2452 .upsert_channel(Channel::Signed(signed_channel), Some(closed_contract))
2453 .await?;
2454 self.store
2455 .persist_chain_monitor(&*self.chain_monitor.lock().await)
2456 .await?;
2457
2458 Ok(msg)
2459 }
2460
2461 async fn on_settle_finalize(
2462 &self,
2463 settle_finalize: &SettleFinalize,
2464 peer_id: &PublicKey,
2465 ) -> Result<(), Error> {
2466 let mut signed_channel =
2467 get_channel_in_state!(self, &settle_finalize.channel_id, Signed, Some(*peer_id))?;
2468 let &own_payout = get_signed_channel_state!(signed_channel, SettledConfirmed, own_payout)?;
2469 let (buffer_tx, own_buffer_adaptor_signature, is_offer, signed_contract_id) = get_signed_channel_rollback_state!(
2470 signed_channel,
2471 Established,
2472 buffer_transaction,
2473 own_buffer_adaptor_signature,
2474 is_offer,
2475 signed_contract_id
2476 )?;
2477
2478 let own_buffer_adaptor_signature = *own_buffer_adaptor_signature;
2479 let is_offer = *is_offer;
2480 let buffer_txid = buffer_tx.compute_txid();
2481 let signed_contract_id = *signed_contract_id;
2482
2483 crate::channel_updater::settle_channel_on_finalize(
2484 &self.secp,
2485 &mut signed_channel,
2486 settle_finalize,
2487 )?;
2488
2489 self.chain_monitor.lock().await.add_tx(
2490 buffer_txid,
2491 ChannelInfo {
2492 channel_id: signed_channel.channel_id,
2493 tx_type: TxType::Revoked {
2494 update_idx: signed_channel.update_idx + 1,
2495 own_adaptor_signature: own_buffer_adaptor_signature,
2496 is_offer,
2497 revoked_tx_type: RevokedTxType::Buffer,
2498 },
2499 },
2500 );
2501
2502 let closed_contract = Contract::Closed(
2503 self.get_collaboratively_closed_contract(&signed_contract_id, own_payout, true)
2504 .await?,
2505 );
2506 self.store
2507 .upsert_channel(Channel::Signed(signed_channel), Some(closed_contract))
2508 .await?;
2509 self.store
2510 .persist_chain_monitor(&*self.chain_monitor.lock().await)
2511 .await?;
2512
2513 Ok(())
2514 }
2515
2516 async fn on_renew_offer(
2517 &self,
2518 renew_offer: &RenewOffer,
2519 peer_id: &PublicKey,
2520 ) -> Result<Option<Reject>, Error> {
2521 let mut signed_channel =
2522 get_channel_in_state!(self, &renew_offer.channel_id, Signed, Some(*peer_id))?;
2523
2524 if let SignedChannelState::RenewOffered { is_offer, .. } = signed_channel.state {
2526 if is_offer {
2527 return Ok(Some(Reject {
2528 channel_id: renew_offer.channel_id,
2529 }));
2530 }
2531 }
2532
2533 let offered_contract = crate::channel_updater::on_renew_offer(
2534 &mut signed_channel,
2535 renew_offer,
2536 PEER_TIMEOUT,
2537 &self.time,
2538 )?;
2539
2540 self.store.create_contract(&offered_contract).await?;
2541 self.store
2542 .upsert_channel(Channel::Signed(signed_channel), None)
2543 .await?;
2544
2545 Ok(None)
2546 }
2547
2548 async fn on_renew_accept(
2549 &self,
2550 renew_accept: &RenewAccept,
2551 peer_id: &PublicKey,
2552 ) -> Result<RenewConfirm, Error> {
2553 let mut signed_channel =
2554 get_channel_in_state!(self, &renew_accept.channel_id, Signed, Some(*peer_id))?;
2555 let offered_contract_id = signed_channel.get_contract_id().ok_or_else(|| {
2556 Error::InvalidState(
2557 "Expected to be in a state with an associated contract id but was not.".to_string(),
2558 )
2559 })?;
2560
2561 let offered_contract =
2562 get_contract_in_state!(self, &offered_contract_id, Offered, Some(*peer_id))?;
2563
2564 let (signed_contract, msg) = crate::channel_updater::verify_renew_accept_and_confirm(
2565 &self.secp,
2566 renew_accept,
2567 &mut signed_channel,
2568 &offered_contract,
2569 CET_NSEQUENCE,
2570 PEER_TIMEOUT,
2571 &self.wallet,
2572 &self.signer_provider,
2573 &self.time,
2574 &self.store,
2575 &self.logger,
2576 )
2577 .await?;
2578
2579 self.store
2581 .upsert_channel(
2582 Channel::Signed(signed_channel),
2583 Some(Contract::Confirmed(signed_contract)),
2584 )
2585 .await?;
2586
2587 Ok(msg)
2588 }
2589
2590 async fn on_renew_confirm(
2591 &self,
2592 renew_confirm: &RenewConfirm,
2593 peer_id: &PublicKey,
2594 ) -> Result<RenewFinalize, Error> {
2595 let mut signed_channel =
2596 get_channel_in_state!(self, &renew_confirm.channel_id, Signed, Some(*peer_id))?;
2597 let own_payout = get_signed_channel_state!(signed_channel, RenewAccepted, own_payout)?;
2598 let contract_id = signed_channel.get_contract_id().ok_or_else(|| {
2599 Error::InvalidState(
2600 "Expected to be in a state with an associated contract id but was not.".to_string(),
2601 )
2602 })?;
2603
2604 let (tx_type, prev_tx_id, closed_contract) = match signed_channel
2605 .roll_back_state
2606 .as_ref()
2607 .expect("to have a rollback state")
2608 {
2609 SignedChannelState::Established {
2610 own_buffer_adaptor_signature,
2611 buffer_transaction,
2612 signed_contract_id,
2613 ..
2614 } => {
2615 let closed_contract = Contract::Closed(
2616 self.get_collaboratively_closed_contract(signed_contract_id, *own_payout, true)
2617 .await?,
2618 );
2619 (
2620 TxType::Revoked {
2621 update_idx: signed_channel.update_idx,
2622 own_adaptor_signature: *own_buffer_adaptor_signature,
2623 is_offer: false,
2624 revoked_tx_type: RevokedTxType::Buffer,
2625 },
2626 buffer_transaction.compute_txid(),
2627 Some(closed_contract),
2628 )
2629 }
2630 SignedChannelState::Settled {
2631 settle_tx,
2632 own_settle_adaptor_signature,
2633 ..
2634 } => (
2635 TxType::Revoked {
2636 update_idx: signed_channel.update_idx,
2637 own_adaptor_signature: *own_settle_adaptor_signature,
2638 is_offer: false,
2639 revoked_tx_type: RevokedTxType::Settle,
2640 },
2641 settle_tx.compute_txid(),
2642 None,
2643 ),
2644 s => {
2645 return Err(Error::InvalidState(format!(
2646 "Expected rollback state Established or Revoked but found {s:?}"
2647 )))
2648 }
2649 };
2650 let accepted_contract =
2651 get_contract_in_state!(self, &contract_id, Accepted, Some(*peer_id))?;
2652
2653 let (signed_contract, msg) = crate::channel_updater::verify_renew_confirm_and_finalize(
2654 &self.secp,
2655 &mut signed_channel,
2656 &accepted_contract,
2657 renew_confirm,
2658 PEER_TIMEOUT,
2659 &self.time,
2660 &self.wallet,
2661 &self.signer_provider,
2662 &self.chain_monitor,
2663 &self.store,
2664 &self.logger,
2665 )
2666 .await?;
2667
2668 self.chain_monitor.lock().await.add_tx(
2669 prev_tx_id,
2670 ChannelInfo {
2671 channel_id: signed_channel.channel_id,
2672 tx_type,
2673 },
2674 );
2675
2676 self.store
2678 .upsert_channel(
2679 Channel::Signed(signed_channel),
2680 Some(Contract::Confirmed(signed_contract)),
2681 )
2682 .await?;
2683
2684 self.store
2685 .persist_chain_monitor(&*self.chain_monitor.lock().await)
2686 .await?;
2687
2688 if let Some(closed_contract) = closed_contract {
2689 self.store.update_contract(&closed_contract).await?;
2690 }
2691
2692 Ok(msg)
2693 }
2694
2695 async fn on_renew_finalize(
2696 &self,
2697 renew_finalize: &RenewFinalize,
2698 peer_id: &PublicKey,
2699 ) -> Result<RenewRevoke, Error> {
2700 let mut signed_channel =
2701 get_channel_in_state!(self, &renew_finalize.channel_id, Signed, Some(*peer_id))?;
2702 let own_payout = get_signed_channel_state!(signed_channel, RenewConfirmed, own_payout)?;
2703
2704 let (tx_type, prev_tx_id, closed_contract) = match signed_channel
2705 .roll_back_state
2706 .as_ref()
2707 .expect("to have a rollback state")
2708 {
2709 SignedChannelState::Established {
2710 own_buffer_adaptor_signature,
2711 buffer_transaction,
2712 signed_contract_id,
2713 ..
2714 } => {
2715 let closed_contract = self
2716 .get_collaboratively_closed_contract(signed_contract_id, *own_payout, true)
2717 .await?;
2718 (
2719 TxType::Revoked {
2720 update_idx: signed_channel.update_idx,
2721 own_adaptor_signature: *own_buffer_adaptor_signature,
2722 is_offer: false,
2723 revoked_tx_type: RevokedTxType::Buffer,
2724 },
2725 buffer_transaction.compute_txid(),
2726 Some(Contract::Closed(closed_contract)),
2727 )
2728 }
2729 SignedChannelState::Settled {
2730 settle_tx,
2731 own_settle_adaptor_signature,
2732 ..
2733 } => (
2734 TxType::Revoked {
2735 update_idx: signed_channel.update_idx,
2736 own_adaptor_signature: *own_settle_adaptor_signature,
2737 is_offer: false,
2738 revoked_tx_type: RevokedTxType::Settle,
2739 },
2740 settle_tx.compute_txid(),
2741 None,
2742 ),
2743 s => {
2744 return Err(Error::InvalidState(format!(
2745 "Expected rollback state of Established or Settled but was {s:?}"
2746 )))
2747 }
2748 };
2749
2750 let msg = crate::channel_updater::renew_channel_on_finalize(
2751 &self.secp,
2752 &mut signed_channel,
2753 renew_finalize,
2754 &self.signer_provider,
2755 )?;
2756
2757 self.chain_monitor.lock().await.add_tx(
2758 prev_tx_id,
2759 ChannelInfo {
2760 channel_id: signed_channel.channel_id,
2761 tx_type,
2762 },
2763 );
2764
2765 let buffer_tx =
2766 get_signed_channel_state!(signed_channel, Established, ref buffer_transaction)?;
2767
2768 self.chain_monitor.lock().await.add_tx(
2769 buffer_tx.compute_txid(),
2770 ChannelInfo {
2771 channel_id: signed_channel.channel_id,
2772 tx_type: TxType::BufferTx,
2773 },
2774 );
2775
2776 self.store
2777 .upsert_channel(Channel::Signed(signed_channel), None)
2778 .await?;
2779 self.store
2780 .persist_chain_monitor(&*self.chain_monitor.lock().await)
2781 .await?;
2782
2783 if let Some(closed_contract) = closed_contract {
2784 self.store.update_contract(&closed_contract).await?;
2785 }
2786
2787 Ok(msg)
2788 }
2789
2790 async fn on_renew_revoke(
2791 &self,
2792 renew_revoke: &RenewRevoke,
2793 peer_id: &PublicKey,
2794 ) -> Result<(), Error> {
2795 let mut signed_channel =
2796 get_channel_in_state!(self, &renew_revoke.channel_id, Signed, Some(*peer_id))?;
2797
2798 crate::channel_updater::renew_channel_on_revoke(
2799 &self.secp,
2800 &mut signed_channel,
2801 renew_revoke,
2802 )?;
2803
2804 self.store
2805 .upsert_channel(Channel::Signed(signed_channel), None)
2806 .await?;
2807
2808 Ok(())
2809 }
2810
2811 async fn on_collaborative_close_offer(
2812 &self,
2813 close_offer: &CollaborativeCloseOffer,
2814 peer_id: &PublicKey,
2815 ) -> Result<(), Error> {
2816 let mut signed_channel =
2817 get_channel_in_state!(self, &close_offer.channel_id, Signed, Some(*peer_id))?;
2818
2819 crate::channel_updater::on_collaborative_close_offer(
2820 &mut signed_channel,
2821 close_offer,
2822 PEER_TIMEOUT,
2823 &self.time,
2824 )?;
2825
2826 self.store
2827 .upsert_channel(Channel::Signed(signed_channel), None)
2828 .await?;
2829
2830 Ok(())
2831 }
2832
2833 async fn on_reject(&self, reject: &Reject, counter_party: &PublicKey) -> Result<(), Error> {
2834 let channel = self.store.get_channel(&reject.channel_id).await?;
2835
2836 if let Some(channel) = channel {
2837 if channel.get_counter_party_id() != *counter_party {
2838 return Err(Error::InvalidParameters(format!(
2839 "Peer {:02x?} is not involved with {} {:02x?}.",
2840 counter_party,
2841 stringify!(Channel),
2842 channel.get_id()
2843 )));
2844 }
2845 match channel {
2846 Channel::Offered(offered_channel) => {
2847 let offered_contract = get_contract_in_state!(
2848 self,
2849 &offered_channel.offered_contract_id,
2850 Offered,
2851 None as Option<PublicKey>
2852 )?;
2853 let utxos = offered_contract
2854 .funding_inputs
2855 .iter()
2856 .map(|funding_input| {
2857 let txid = Transaction::consensus_decode(
2858 &mut funding_input.prev_tx.as_slice(),
2859 )
2860 .expect("Transaction Decode Error")
2861 .compute_txid();
2862 let vout = funding_input.prev_tx_vout;
2863 OutPoint { txid, vout }
2864 })
2865 .collect::<Vec<_>>();
2866
2867 self.wallet.unreserve_utxos(&utxos)?;
2868
2869 self.store
2871 .upsert_channel(
2872 Channel::Cancelled(offered_channel),
2873 Some(Contract::Rejected(offered_contract)),
2874 )
2875 .await?;
2876 }
2877 Channel::Signed(mut signed_channel) => {
2878 let contract = match signed_channel.state {
2879 SignedChannelState::RenewOffered {
2880 offered_contract_id,
2881 ..
2882 } => {
2883 let offered_contract = get_contract_in_state!(
2884 self,
2885 &offered_contract_id,
2886 Offered,
2887 None::<PublicKey>
2888 )?;
2889 Some(Contract::Rejected(offered_contract))
2890 }
2891 _ => None,
2892 };
2893
2894 crate::channel_updater::on_reject(&mut signed_channel)?;
2895
2896 self.store
2897 .upsert_channel(Channel::Signed(signed_channel), contract)
2898 .await?;
2899 }
2900 channel => {
2901 return Err(Error::InvalidState(format!(
2902 "Not in a state adequate to receive a reject message. {channel:?}"
2903 )))
2904 }
2905 }
2906 } else {
2907 log_warn!(
2908 self.logger,
2909 "Couldn't find rejected dlc channel with id: {}",
2910 reject.channel_id.to_lower_hex_string()
2911 );
2912 }
2913
2914 Ok(())
2915 }
2916
2917 async fn channel_checks(&self) -> Result<(), Error> {
2918 let established_closing_channels = self
2919 .store
2920 .get_signed_channels(Some(SignedChannelStateType::Closing))
2921 .await?;
2922
2923 for channel in established_closing_channels {
2924 if let Err(e) = self.try_finalize_closing_established_channel(channel).await {
2925 log_error!(
2926 self.logger,
2927 "Error trying to close established channel: {}",
2928 e
2929 );
2930 }
2931 }
2932
2933 if let Err(e) = self.check_for_timed_out_channels().await {
2934 log_error!(self.logger, "Error checking timed out channels {}", e);
2935 }
2936 self.check_for_watched_tx().await
2937 }
2938
2939 async fn check_for_timed_out_channels(&self) -> Result<(), Error> {
2940 check_for_timed_out_channels!(self, RenewOffered);
2941 check_for_timed_out_channels!(self, RenewAccepted);
2942 check_for_timed_out_channels!(self, RenewConfirmed);
2943 check_for_timed_out_channels!(self, SettledOffered);
2944 check_for_timed_out_channels!(self, SettledAccepted);
2945 check_for_timed_out_channels!(self, SettledConfirmed);
2946
2947 Ok(())
2948 }
2949
2950 pub(crate) async fn process_watched_txs(
2951 &self,
2952 watched_txs: Vec<(Transaction, ChannelInfo)>,
2953 ) -> Result<(), Error> {
2954 for (tx, channel_info) in watched_txs {
2955 let mut signed_channel = match get_channel_in_state!(
2956 self,
2957 &channel_info.channel_id,
2958 Signed,
2959 None as Option<PublicKey>
2960 ) {
2961 Ok(c) => c,
2962 Err(e) => {
2963 log_error!(
2964 self.logger,
2965 "Could not retrieve channel {:?}: {}",
2966 channel_info.channel_id,
2967 e
2968 );
2969 continue;
2970 }
2971 };
2972
2973 let persist = match channel_info.tx_type {
2974 TxType::BufferTx => {
2975 let contract_id = signed_channel
2982 .get_contract_id()
2983 .expect("to have a contract id");
2984 let mut state = SignedChannelState::Closing {
2985 buffer_transaction: tx.clone(),
2986 is_initiator: false,
2987 contract_id,
2988 keys_id: signed_channel.keys_id().ok_or_else(|| {
2989 Error::InvalidState("Expected to have keys_id.".to_string())
2990 })?,
2991 };
2992 std::mem::swap(&mut signed_channel.state, &mut state);
2993
2994 signed_channel.roll_back_state = Some(state);
2995
2996 self.store
2997 .upsert_channel(Channel::Signed(signed_channel), None)
2998 .await?;
2999
3000 false
3001 }
3002 TxType::Revoked {
3003 update_idx,
3004 own_adaptor_signature,
3005 is_offer,
3006 revoked_tx_type,
3007 } => {
3008 let secret = signed_channel
3009 .counter_party_commitment_secrets
3010 .get_secret(update_idx)
3011 .expect("to be able to retrieve the per update secret");
3012 let counter_per_update_secret = SecretKey::from_slice(&secret)
3013 .expect("to be able to parse the counter per update secret.");
3014
3015 let per_update_seed_pk = signed_channel.own_per_update_seed;
3016
3017 let per_update_seed_sk = self
3018 .signer_provider
3019 .get_secret_key_for_pubkey(&per_update_seed_pk)?;
3020
3021 let per_update_secret = SecretKey::from_slice(&build_commitment_secret(
3022 per_update_seed_sk.as_ref(),
3023 update_idx,
3024 ))
3025 .expect("a valid secret key.");
3026
3027 let per_update_point =
3028 PublicKey::from_secret_key(&self.secp, &per_update_secret);
3029
3030 let own_revocation_params = signed_channel.own_points.get_revokable_params(
3031 &self.secp,
3032 &signed_channel.counter_points.revocation_basepoint,
3033 &per_update_point,
3034 );
3035
3036 let counter_per_update_point =
3037 PublicKey::from_secret_key(&self.secp, &counter_per_update_secret);
3038
3039 let base_own_sk = self
3040 .signer_provider
3041 .get_secret_key_for_pubkey(&signed_channel.own_points.own_basepoint)?;
3042
3043 let own_sk = derive_private_key(&self.secp, &per_update_point, &base_own_sk);
3044
3045 let counter_revocation_params =
3046 signed_channel.counter_points.get_revokable_params(
3047 &self.secp,
3048 &signed_channel.own_points.revocation_basepoint,
3049 &counter_per_update_point,
3050 );
3051
3052 let witness = if signed_channel.own_params.fund_pubkey
3053 < signed_channel.counter_params.fund_pubkey
3054 {
3055 tx.input[0].witness.to_vec().remove(1)
3056 } else {
3057 tx.input[0].witness.to_vec().remove(2)
3058 };
3059
3060 let sig_data = witness
3061 .iter()
3062 .take(witness.len() - 1)
3063 .cloned()
3064 .collect::<Vec<_>>();
3065 let own_sig = Signature::from_der(&sig_data)?;
3066
3067 let counter_sk = own_adaptor_signature.recover(
3068 &self.secp,
3069 &own_sig,
3070 &counter_revocation_params.publish_pk.inner,
3071 )?;
3072
3073 let own_revocation_base_secret =
3074 &self.signer_provider.get_secret_key_for_pubkey(
3075 &signed_channel.own_points.revocation_basepoint,
3076 )?;
3077
3078 let counter_revocation_sk = derive_private_revocation_key(
3079 &self.secp,
3080 &counter_per_update_secret,
3081 own_revocation_base_secret,
3082 );
3083
3084 let (offer_params, accept_params) = if is_offer {
3085 (&own_revocation_params, &counter_revocation_params)
3086 } else {
3087 (&counter_revocation_params, &own_revocation_params)
3088 };
3089
3090 let fee_rate_per_vb: u64 = (self.fee_estimator.get_est_sat_per_1000_weight(
3091 lightning::chain::chaininterface::ConfirmationTarget::UrgentOnChainSweep,
3092 ) / 250)
3093 .into();
3094
3095 let signed_tx = match revoked_tx_type {
3096 RevokedTxType::Buffer => {
3097 ddk_dlc::channel::create_and_sign_punish_buffer_transaction(
3098 &self.secp,
3099 offer_params,
3100 accept_params,
3101 &own_sk,
3102 &counter_sk,
3103 &counter_revocation_sk,
3104 &tx,
3105 &self.wallet.get_new_address().await?,
3106 0,
3107 fee_rate_per_vb,
3108 )?
3109 }
3110 RevokedTxType::Settle => {
3111 ddk_dlc::channel::create_and_sign_punish_settle_transaction(
3112 &self.secp,
3113 offer_params,
3114 accept_params,
3115 &own_sk,
3116 &counter_sk,
3117 &counter_revocation_sk,
3118 &tx,
3119 &self.wallet.get_new_address().await?,
3120 CET_NSEQUENCE,
3121 0,
3122 fee_rate_per_vb,
3123 is_offer,
3124 )?
3125 }
3126 };
3127
3128 self.blockchain.send_transaction(&signed_tx).await?;
3129
3130 let closed_channel = Channel::ClosedPunished(ClosedPunishedChannel {
3131 counter_party: signed_channel.counter_party,
3132 temporary_channel_id: signed_channel.temporary_channel_id,
3133 channel_id: signed_channel.channel_id,
3134 punish_txid: signed_tx.compute_txid(),
3135 });
3136
3137 self.chain_monitor
3140 .lock()
3141 .await
3142 .cleanup_channel(signed_channel.channel_id);
3143 self.store.upsert_channel(closed_channel, None).await?;
3144 true
3145 }
3146 TxType::CollaborativeClose => {
3147 if let Some(SignedChannelState::Established {
3148 signed_contract_id, ..
3149 }) = signed_channel.roll_back_state
3150 {
3151 let counter_payout = get_signed_channel_state!(
3152 signed_channel,
3153 CollaborativeCloseOffered,
3154 counter_payout
3155 )?;
3156 let closed_contract = self
3157 .get_collaboratively_closed_contract(
3158 &signed_contract_id,
3159 *counter_payout,
3160 false,
3161 )
3162 .await?;
3163 self.store
3164 .update_contract(&Contract::Closed(closed_contract))
3165 .await?;
3166 }
3167 let closed_channel = Channel::CollaborativelyClosed(ClosedChannel {
3168 counter_party: signed_channel.counter_party,
3169 temporary_channel_id: signed_channel.temporary_channel_id,
3170 channel_id: signed_channel.channel_id,
3171 });
3172 self.chain_monitor
3173 .lock()
3174 .await
3175 .cleanup_channel(signed_channel.channel_id);
3176 self.store.upsert_channel(closed_channel, None).await?;
3177 true
3178 }
3179 TxType::SettleTx => {
3180 let closed_channel = Channel::CounterClosed(ClosedChannel {
3181 counter_party: signed_channel.counter_party,
3182 temporary_channel_id: signed_channel.temporary_channel_id,
3183 channel_id: signed_channel.channel_id,
3184 });
3185 self.chain_monitor
3186 .lock()
3187 .await
3188 .cleanup_channel(signed_channel.channel_id);
3189 self.store.upsert_channel(closed_channel, None).await?;
3190 true
3191 }
3192 TxType::Cet => {
3193 let contract_id = signed_channel.get_contract_id();
3194 let closed_channel = {
3195 match &signed_channel.state {
3196 SignedChannelState::Closing { is_initiator, .. } => {
3197 if *is_initiator {
3198 Channel::Closed(ClosedChannel {
3199 counter_party: signed_channel.counter_party,
3200 temporary_channel_id: signed_channel.temporary_channel_id,
3201 channel_id: signed_channel.channel_id,
3202 })
3203 } else {
3204 Channel::CounterClosed(ClosedChannel {
3205 counter_party: signed_channel.counter_party,
3206 temporary_channel_id: signed_channel.temporary_channel_id,
3207 channel_id: signed_channel.channel_id,
3208 })
3209 }
3210 }
3211 _ => {
3212 log_error!(self.logger, "Saw spending of buffer transaction without being in closing state");
3213 Channel::Closed(ClosedChannel {
3214 counter_party: signed_channel.counter_party,
3215 temporary_channel_id: signed_channel.temporary_channel_id,
3216 channel_id: signed_channel.channel_id,
3217 })
3218 }
3219 }
3220 };
3221
3222 self.chain_monitor
3223 .lock()
3224 .await
3225 .cleanup_channel(signed_channel.channel_id);
3226 let pre_closed_contract = futures::stream::iter(contract_id)
3227 .then(|contract_id| {
3228 let txn = tx.clone();
3229 async move {
3230 self.store.get_contract(&contract_id).await.map(|contract| {
3231 contract.and_then(|contract| match contract {
3232 Contract::Confirmed(signed_contract) => {
3233 Some(Contract::PreClosed(PreClosedContract {
3234 signed_contract,
3235 attestations: None,
3236 signed_cet: txn,
3237 }))
3238 }
3239 _ => None,
3240 })
3241 })
3242 }
3243 })
3244 .try_collect::<Vec<_>>()
3245 .await?
3246 .into_iter()
3247 .flatten()
3248 .next();
3249
3250 self.store
3251 .upsert_channel(closed_channel, pre_closed_contract)
3252 .await?;
3253
3254 true
3255 }
3256 };
3257
3258 if persist {
3259 self.store
3260 .persist_chain_monitor(&*self.chain_monitor.lock().await)
3261 .await?;
3262 }
3263 }
3264 Ok(())
3265 }
3266
3267 async fn check_for_watched_tx(&self) -> Result<(), Error> {
3268 let confirmed_txs = self.chain_monitor.lock().await.confirmed_txs();
3269
3270 self.process_watched_txs(confirmed_txs).await?;
3271
3272 self.store
3273 .persist_chain_monitor(&*self.chain_monitor.lock().await)
3274 .await?;
3275
3276 Ok(())
3277 }
3278
3279 async fn force_close_channel_internal(
3280 &self,
3281 mut channel: SignedChannel,
3282 is_initiator: bool,
3283 ) -> Result<(), Error> {
3284 match &channel.state {
3285 SignedChannelState::Established {
3286 counter_buffer_adaptor_signature,
3287 buffer_transaction,
3288 ..
3289 } => {
3290 let counter_buffer_adaptor_signature = *counter_buffer_adaptor_signature;
3291 let buffer_transaction = buffer_transaction.clone();
3292 self.initiate_unilateral_close_established_channel(
3293 channel,
3294 is_initiator,
3295 counter_buffer_adaptor_signature,
3296 buffer_transaction,
3297 )
3298 .await
3299 }
3300 SignedChannelState::RenewFinalized {
3301 buffer_transaction,
3302 offer_buffer_adaptor_signature,
3303 ..
3304 } => {
3305 let offer_buffer_adaptor_signature = *offer_buffer_adaptor_signature;
3306 let buffer_transaction = buffer_transaction.clone();
3307 self.initiate_unilateral_close_established_channel(
3308 channel,
3309 is_initiator,
3310 offer_buffer_adaptor_signature,
3311 buffer_transaction,
3312 )
3313 .await
3314 }
3315 SignedChannelState::Settled { .. } => {
3316 self.close_settled_channel(channel, is_initiator).await
3317 }
3318 SignedChannelState::SettledOffered { .. }
3319 | SignedChannelState::SettledReceived { .. }
3320 | SignedChannelState::SettledAccepted { .. }
3321 | SignedChannelState::SettledConfirmed { .. }
3322 | SignedChannelState::RenewOffered { .. }
3323 | SignedChannelState::RenewAccepted { .. }
3324 | SignedChannelState::RenewConfirmed { .. }
3325 | SignedChannelState::CollaborativeCloseOffered { .. } => {
3326 channel.state = channel
3327 .roll_back_state
3328 .take()
3329 .expect("to have a rollback state");
3330 let channel_clone = channel.clone(); Box::pin(self.force_close_channel_internal(channel_clone, is_initiator)).await
3332 }
3333 SignedChannelState::Closing { .. } => Err(Error::InvalidState(
3334 "Channel is already closing.".to_string(),
3335 )),
3336 }
3337 }
3338
3339 async fn initiate_unilateral_close_established_channel(
3341 &self,
3342 mut signed_channel: SignedChannel,
3343 is_initiator: bool,
3344 buffer_adaptor_signature: EcdsaAdaptorSignature,
3345 buffer_transaction: Transaction,
3346 ) -> Result<(), Error> {
3347 let keys_id = signed_channel.keys_id().ok_or_else(|| {
3348 Error::InvalidState("Expected to be in a state with an associated keys id.".to_string())
3349 })?;
3350
3351 crate::channel_updater::initiate_unilateral_close_established_channel(
3352 &self.secp,
3353 &mut signed_channel,
3354 buffer_adaptor_signature,
3355 keys_id,
3356 buffer_transaction,
3357 &self.signer_provider,
3358 is_initiator,
3359 )?;
3360
3361 let buffer_transaction =
3362 get_signed_channel_state!(signed_channel, Closing, ref buffer_transaction)?;
3363
3364 self.blockchain.send_transaction(buffer_transaction).await?;
3365
3366 self.chain_monitor
3367 .lock()
3368 .await
3369 .remove_tx(&buffer_transaction.compute_txid());
3370
3371 self.store
3372 .upsert_channel(Channel::Signed(signed_channel), None)
3373 .await?;
3374
3375 self.store
3376 .persist_chain_monitor(&*self.chain_monitor.lock().await)
3377 .await?;
3378
3379 Ok(())
3380 }
3381
3382 async fn close_settled_channel(
3384 &self,
3385 signed_channel: SignedChannel,
3386 is_initiator: bool,
3387 ) -> Result<(), Error> {
3388 let (settle_tx, closed_channel) = crate::channel_updater::close_settled_channel(
3389 &self.secp,
3390 &signed_channel,
3391 &self.signer_provider,
3392 is_initiator,
3393 )?;
3394
3395 if self
3396 .blockchain
3397 .get_transaction_confirmations(&settle_tx.compute_txid())
3398 .await
3399 .unwrap_or(0)
3400 == 0
3401 {
3402 self.blockchain.send_transaction(&settle_tx).await?;
3403 }
3404
3405 self.chain_monitor
3406 .lock()
3407 .await
3408 .cleanup_channel(signed_channel.channel_id);
3409
3410 self.store.upsert_channel(closed_channel, None).await?;
3411
3412 Ok(())
3413 }
3414
3415 async fn get_collaboratively_closed_contract(
3416 &self,
3417 contract_id: &ContractId,
3418 payout: Amount,
3419 is_own_payout: bool,
3420 ) -> Result<ClosedContract, Error> {
3421 let contract = get_contract_in_state!(self, contract_id, Confirmed, None::<PublicKey>)?;
3422 let own_collateral = if contract.accepted_contract.offered_contract.is_offer_party {
3423 contract
3424 .accepted_contract
3425 .offered_contract
3426 .offer_params
3427 .collateral
3428 } else {
3429 contract.accepted_contract.accept_params.collateral
3430 };
3431 let own_payout = if is_own_payout {
3432 payout
3433 } else {
3434 contract.accepted_contract.offered_contract.total_collateral - payout
3435 };
3436 let pnl =
3437 SignedAmount::from_sat(own_payout.to_sat() as i64 - own_collateral.to_sat() as i64);
3438 Ok(ClosedContract {
3439 attestations: None,
3440 signed_cet: None,
3441 contract_id: *contract_id,
3442 temporary_contract_id: contract.accepted_contract.offered_contract.id,
3443 counter_party_id: contract.accepted_contract.offered_contract.counter_party,
3444 funding_txid: contract
3445 .accepted_contract
3446 .dlc_transactions
3447 .fund
3448 .compute_txid(),
3449 pnl,
3450 signed_contract: contract.clone(),
3451 })
3452 }
3453
3454 async fn oracle_announcements(
3455 &self,
3456 contract_input: &ContractInput,
3457 ) -> Result<Vec<Vec<OracleAnnouncement>>, Error> {
3458 let announcements = stream::iter(contract_input.contract_infos.iter())
3459 .map(|x| {
3460 let future = self.get_oracle_announcements(&x.oracles);
3461 async move {
3462 match future.await {
3463 Ok(result) => Ok(result),
3464 Err(e) => {
3465 log_error!(self.logger, "Failed to get oracle announcements: {}", e);
3466 Err(e)
3467 }
3468 }
3469 }
3470 })
3471 .collect::<FuturesUnordered<_>>()
3472 .await
3473 .try_collect::<Vec<_>>()
3474 .await?;
3475 Ok(announcements)
3476 }
3477}