Skip to main content

ddk_manager/
manager.rs

1//! #Manager a component to create and update DLCs.
2
3use super::{
4    Blockchain, CachedContractSignerProvider, ContractSigner, Oracle, Storage, Time, Wallet,
5};
6use crate::chain_monitor::{ChainMonitor, ChannelInfo, RevokedTxType, TxType};
7use crate::channel::offered_channel::OfferedChannel;
8use crate::channel::signed_channel::{SignedChannel, SignedChannelState, SignedChannelStateType};
9use crate::channel::{Channel, ClosedChannel, ClosedPunishedChannel};
10use crate::channel_updater::get_signed_channel_state;
11use crate::channel_updater::verify_signed_channel;
12use crate::contract::{
13    accepted_contract::AcceptedContract, contract_info::ContractInfo,
14    contract_input::ContractInput, contract_input::OracleInput, offered_contract::OfferedContract,
15    signed_contract::SignedContract, AdaptorInfo, ClosedContract, Contract, FailedAcceptContract,
16    FailedSignContract, PreClosedContract,
17};
18use crate::contract_updater::{accept_contract, verify_accepted_and_sign_contract};
19use crate::error::Error;
20use crate::utils::get_object_in_state;
21use crate::{ChannelId, ContractId, ContractSignerProvider};
22use bitcoin::absolute::Height;
23use bitcoin::consensus::encode::serialize_hex;
24use bitcoin::consensus::Decodable;
25use bitcoin::hex::DisplayHex;
26use bitcoin::{Address, Amount, SignedAmount};
27use bitcoin::{OutPoint, Transaction};
28use ddk_messages::channel::{
29    AcceptChannel, CollaborativeCloseOffer, OfferChannel, Reject, RenewAccept, RenewConfirm,
30    RenewFinalize, RenewOffer, RenewRevoke, SettleAccept, SettleConfirm, SettleFinalize,
31    SettleOffer, SignChannel,
32};
33use ddk_messages::oracle_msgs::{OracleAnnouncement, OracleAttestation};
34use ddk_messages::{AcceptDlc, CloseDlc, Message as DlcMessage, OfferDlc, SignDlc};
35use futures::stream;
36use futures::stream::FuturesUnordered;
37use futures::{StreamExt, TryStreamExt};
38use lightning::chain::chaininterface::FeeEstimator;
39use lightning::ln::chan_utils::{
40    build_commitment_secret, derive_private_key, derive_private_revocation_key,
41};
42use lightning::util::logger::Logger;
43use lightning::{log_debug, log_error, log_info, log_trace, log_warn};
44use once_cell::sync::Lazy;
45use secp256k1_zkp::XOnlyPublicKey;
46use secp256k1_zkp::{
47    ecdsa::Signature, All, EcdsaAdaptorSignature, PublicKey, Secp256k1, SecretKey,
48};
49use std::collections::HashMap;
50use std::ops::Deref;
51use std::string::ToString;
52use std::sync::Arc;
53use tokio::sync::Mutex;
54
55/// The number of confirmations required before moving the the confirmed state.
56pub const DEFAULT_NB_CONFIRMATIONS: u32 = 3;
57
58/// The number of confirmations required before moving the the confirmed state.
59/// Uses the NB_CONFIRMATIONS environment variable to set the value.
60static NB_CONFIRMATIONS: Lazy<u32> = Lazy::new(|| match std::env::var("NB_CONFIRMATIONS") {
61    Ok(val) => val.parse().unwrap_or(DEFAULT_NB_CONFIRMATIONS),
62    Err(_) => DEFAULT_NB_CONFIRMATIONS,
63});
64
65/// Automatically broadcast the refund transaction in `check_confirmed_contracts`
66static AUTOMATIC_REFUND: Lazy<bool> = Lazy::new(|| match std::env::var("AUTOMATIC_REFUND") {
67    Ok(val) => val.parse().unwrap_or(true),
68    Err(_) => true,
69});
70
71/// The delay to set the refund value to.
72pub const REFUND_DELAY: u32 = 86400 * 7;
73/// The nSequence value used for CETs in DLC channels
74pub const CET_NSEQUENCE: u32 = 288;
75/// Timeout in seconds when waiting for a peer's reply, after which a DLC channel
76/// is forced closed.
77pub const PEER_TIMEOUT: u64 = 3600;
78
79type ClosableContractInfo<'a> = Option<(
80    &'a ContractInfo,
81    &'a AdaptorInfo,
82    Vec<(usize, OracleAttestation)>,
83)>;
84
85/// Used to create and update DLCs.
86#[derive(Debug)]
87pub struct Manager<
88    W: Deref,
89    SP: Deref,
90    B: Deref,
91    S: Deref,
92    O: Deref,
93    T: Deref,
94    F: Deref,
95    X: ContractSigner,
96    L: Deref,
97> where
98    W::Target: Wallet,
99    SP::Target: ContractSignerProvider<Signer = X>,
100    B::Target: Blockchain,
101    S::Target: Storage,
102    O::Target: Oracle,
103    T::Target: Time,
104    F::Target: FeeEstimator,
105    L::Target: Logger,
106{
107    oracles: HashMap<XOnlyPublicKey, O>,
108    wallet: W,
109    signer_provider: SP,
110    blockchain: B,
111    store: S,
112    secp: Secp256k1<All>,
113    chain_monitor: Mutex<ChainMonitor>,
114    time: T,
115    fee_estimator: F,
116    logger: L,
117}
118
119macro_rules! get_contract_in_state {
120    ($manager: ident, $contract_id: expr, $state: ident, $peer_id: expr) => {{
121        get_object_in_state!(
122            $manager,
123            $contract_id,
124            $state,
125            $peer_id,
126            Contract,
127            get_contract
128        )
129    }};
130}
131
132macro_rules! get_channel_in_state {
133    ($manager: ident, $channel_id: expr, $state: ident, $peer_id: expr) => {{
134        get_object_in_state!(
135            $manager,
136            $channel_id,
137            $state,
138            $peer_id,
139            Channel,
140            get_channel
141        )
142    }};
143}
144
145macro_rules! get_signed_channel_rollback_state {
146    ($signed_channel: ident, $state: ident, $($field: ident),*) => {{
147       match $signed_channel.roll_back_state.as_ref() {
148           Some(SignedChannelState::$state{$($field,)* ..}) => Ok(($($field,)*)),
149           _ => Err(Error::InvalidState(format!("Expected rollback state {} got {:?}", stringify!($state), $signed_channel.state))),
150        }
151    }};
152}
153
154macro_rules! check_for_timed_out_channels {
155    ($manager: ident, $state: ident) => {
156        let channels = $manager
157            .store
158            .get_signed_channels(Some(SignedChannelStateType::$state))
159            .await?;
160
161        for channel in channels {
162            if let SignedChannelState::$state { timeout, .. } = channel.state {
163                let is_timed_out = timeout < $manager.time.unix_time_now();
164                if is_timed_out {
165                    match $manager.force_close_channel_internal(channel, true).await {
166                        Err(e) => {
167                            log_error!($manager.logger, "Error force closing channel. error={}", e)
168                        }
169                        _ => {}
170                    }
171                }
172            }
173        }
174    };
175}
176
177impl<
178        W: Deref,
179        SP: Deref,
180        B: Deref,
181        S: Deref,
182        O: Deref,
183        T: Deref,
184        F: Deref,
185        X: ContractSigner,
186        L: Deref,
187    > Manager<W, Arc<CachedContractSignerProvider<SP, X>>, B, S, O, T, F, X, L>
188where
189    W::Target: Wallet,
190    SP::Target: ContractSignerProvider<Signer = X>,
191    B::Target: Blockchain,
192    S::Target: Storage,
193    O::Target: Oracle,
194    T::Target: Time,
195    F::Target: FeeEstimator,
196    L::Target: Logger,
197{
198    /// Create a new Manager struct.
199    #[allow(clippy::too_many_arguments)]
200    #[tracing::instrument(skip_all)]
201    pub async fn new(
202        wallet: W,
203        signer_provider: SP,
204        blockchain: B,
205        store: S,
206        oracles: HashMap<XOnlyPublicKey, O>,
207        time: T,
208        fee_estimator: F,
209        logger: L,
210    ) -> Result<Self, Error> {
211        let init_height = blockchain.get_blockchain_height().await?;
212        let chain_monitor = Mutex::new(
213            store
214                .get_chain_monitor()
215                .await?
216                .unwrap_or(ChainMonitor::new(init_height)),
217        );
218
219        let signer_provider = Arc::new(CachedContractSignerProvider::new(signer_provider));
220
221        log_info!(logger, "Manager initialized");
222
223        Ok(Manager {
224            secp: secp256k1_zkp::Secp256k1::new(),
225            wallet,
226            signer_provider,
227            blockchain,
228            store,
229            oracles,
230            time,
231            fee_estimator,
232            chain_monitor,
233            logger,
234        })
235    }
236
237    /// Get the store from the Manager to access contracts.
238    pub fn get_store(&self) -> &S {
239        &self.store
240    }
241
242    /// Function called to pass a DlcMessage to the Manager.
243    #[tracing::instrument(skip_all)]
244    pub async fn on_dlc_message(
245        &self,
246        msg: &DlcMessage,
247        counter_party: PublicKey,
248    ) -> Result<Option<DlcMessage>, Error> {
249        match msg {
250            DlcMessage::Offer(o) => {
251                log_debug!(self.logger, "Received offer message");
252                self.on_offer_message(o, counter_party).await?;
253                Ok(None)
254            }
255            DlcMessage::Accept(a) => {
256                log_debug!(self.logger, "Received accept message");
257                Ok(Some(self.on_accept_message(a, &counter_party).await?))
258            }
259            DlcMessage::Sign(s) => {
260                log_debug!(self.logger, "Received sign message");
261                self.on_sign_message(s, &counter_party).await?;
262                Ok(None)
263            }
264            DlcMessage::Close(c) => {
265                log_debug!(self.logger, "Received close message");
266                self.on_close_message(c, &counter_party).await?;
267                Ok(None)
268            }
269            DlcMessage::OfferChannel(o) => {
270                log_debug!(self.logger, "Received offer channel message");
271                self.on_offer_channel(o, counter_party).await?;
272                Ok(None)
273            }
274            DlcMessage::AcceptChannel(a) => {
275                log_debug!(self.logger, "Received accept channel message");
276                Ok(Some(DlcMessage::SignChannel(
277                    self.on_accept_channel(a, &counter_party).await?,
278                )))
279            }
280            DlcMessage::SignChannel(s) => {
281                log_debug!(self.logger, "Received sign channel message");
282                self.on_sign_channel(s, &counter_party).await?;
283                Ok(None)
284            }
285            DlcMessage::SettleOffer(s) => {
286                log_debug!(self.logger, "Received settle offer message");
287                match self.on_settle_offer(s, &counter_party).await? {
288                    Some(msg) => Ok(Some(DlcMessage::Reject(msg))),
289                    None => Ok(None),
290                }
291            }
292            DlcMessage::SettleAccept(s) => {
293                log_debug!(self.logger, "Received settle accept message");
294                Ok(Some(DlcMessage::SettleConfirm(
295                    self.on_settle_accept(s, &counter_party).await?,
296                )))
297            }
298            DlcMessage::SettleConfirm(s) => {
299                log_debug!(self.logger, "Received settle confirm message");
300                Ok(Some(DlcMessage::SettleFinalize(
301                    self.on_settle_confirm(s, &counter_party).await?,
302                )))
303            }
304            DlcMessage::SettleFinalize(s) => {
305                log_debug!(self.logger, "Received settle finalize message");
306                self.on_settle_finalize(s, &counter_party).await?;
307                Ok(None)
308            }
309            DlcMessage::RenewOffer(r) => {
310                log_debug!(self.logger, "Received renew offer message");
311                match self.on_renew_offer(r, &counter_party).await? {
312                    Some(msg) => Ok(Some(DlcMessage::Reject(msg))),
313                    None => Ok(None),
314                }
315            }
316            DlcMessage::RenewAccept(r) => {
317                log_debug!(self.logger, "Received renew accept message");
318                Ok(Some(DlcMessage::RenewConfirm(
319                    self.on_renew_accept(r, &counter_party).await?,
320                )))
321            }
322            DlcMessage::RenewConfirm(r) => {
323                log_debug!(self.logger, "Received renew confirm message");
324                Ok(Some(DlcMessage::RenewFinalize(
325                    self.on_renew_confirm(r, &counter_party).await?,
326                )))
327            }
328            DlcMessage::RenewFinalize(r) => {
329                log_debug!(self.logger, "Received renew finalize message");
330                let revoke = self.on_renew_finalize(r, &counter_party).await?;
331                Ok(Some(DlcMessage::RenewRevoke(revoke)))
332            }
333            DlcMessage::RenewRevoke(r) => {
334                log_debug!(self.logger, "Received renew revoke message");
335                self.on_renew_revoke(r, &counter_party).await?;
336                Ok(None)
337            }
338            DlcMessage::CollaborativeCloseOffer(c) => {
339                log_debug!(self.logger, "Received collaborative close offer message");
340                self.on_collaborative_close_offer(c, &counter_party).await?;
341                Ok(None)
342            }
343            DlcMessage::Reject(r) => {
344                log_debug!(self.logger, "Received reject message");
345                self.on_reject(r, &counter_party).await?;
346                Ok(None)
347            }
348        }
349    }
350
351    /// Create a new spliced offer
352    #[tracing::instrument(skip_all)]
353    pub async fn send_splice_offer(
354        &self,
355        contract_input: &ContractInput,
356        counter_party: PublicKey,
357        contract_id: &ContractId,
358    ) -> Result<OfferDlc, Error> {
359        let oracle_announcements = self.oracle_announcements(contract_input).await?;
360
361        self.send_splice_offer_with_announcements(
362            contract_input,
363            counter_party,
364            contract_id,
365            oracle_announcements,
366        )
367        .await
368    }
369
370    /// Creates a new offer DLC using an existing DLC as an input.
371    /// The new DLC MUST use a Confirmed contract as an input.
372    #[tracing::instrument(skip_all)]
373    pub async fn send_splice_offer_with_announcements(
374        &self,
375        contract_input: &ContractInput,
376        counter_party: PublicKey,
377        contract_id: &ContractId,
378        oracle_announcements: Vec<Vec<OracleAnnouncement>>,
379    ) -> Result<OfferDlc, Error> {
380        let confirmed_contract =
381            get_contract_in_state!(self, contract_id, Confirmed, Some(counter_party))?;
382
383        let dlc_input = confirmed_contract.get_dlc_input();
384
385        let (offered_contract, offer_msg) = crate::contract_updater::offer_contract(
386            &self.secp,
387            contract_input,
388            oracle_announcements,
389            vec![dlc_input],
390            REFUND_DELAY,
391            &counter_party,
392            &self.wallet,
393            &self.blockchain,
394            &self.time,
395            &self.signer_provider,
396            &self.logger,
397        )
398        .await?;
399
400        offered_contract.validate()?;
401
402        self.store.create_contract(&offered_contract).await?;
403
404        Ok(offer_msg)
405    }
406
407    /// Function called to create a new DLC. The offered contract will be stored
408    /// and an OfferDlc message returned.
409    ///
410    /// This function will fetch the oracle announcements from the oracle.
411    #[tracing::instrument(skip_all)]
412    pub async fn send_offer(
413        &self,
414        contract_input: &ContractInput,
415        counter_party: PublicKey,
416    ) -> Result<OfferDlc, Error> {
417        // If the oracle announcement fails to retrieve, then log and continue.
418        let oracle_announcements = self.oracle_announcements(contract_input).await?;
419
420        self.send_offer_with_announcements(contract_input, counter_party, oracle_announcements)
421            .await
422    }
423
424    /// Function called to create a new DLC. The offered contract will be stored
425    /// and an OfferDlc message returned.
426    ///
427    /// This function allows to pass the oracle announcements directly instead of
428    /// fetching them from the oracle.
429    #[tracing::instrument(skip_all)]
430    pub async fn send_offer_with_announcements(
431        &self,
432        contract_input: &ContractInput,
433        counter_party: PublicKey,
434        oracle_announcements: Vec<Vec<OracleAnnouncement>>,
435    ) -> Result<OfferDlc, Error> {
436        let (offered_contract, offer_msg) = crate::contract_updater::offer_contract(
437            &self.secp,
438            contract_input,
439            oracle_announcements,
440            vec![],
441            REFUND_DELAY,
442            &counter_party,
443            &self.wallet,
444            &self.blockchain,
445            &self.time,
446            &self.signer_provider,
447            &self.logger,
448        )
449        .await?;
450
451        offered_contract.validate()?;
452
453        self.store.create_contract(&offered_contract).await?;
454
455        Ok(offer_msg)
456    }
457
458    /// Function to call to accept a DLC for which an offer was received.
459    #[tracing::instrument(skip_all)]
460    pub async fn accept_contract_offer(
461        &self,
462        contract_id: &ContractId,
463    ) -> Result<(ContractId, PublicKey, AcceptDlc), Error> {
464        let offered_contract =
465            get_contract_in_state!(self, contract_id, Offered, None as Option<PublicKey>)?;
466
467        let counter_party = offered_contract.counter_party;
468
469        let (accepted_contract, accept_msg) = accept_contract(
470            &self.secp,
471            &offered_contract,
472            &self.wallet,
473            &self.signer_provider,
474            &self.blockchain,
475            &self.logger,
476        )
477        .await?;
478
479        self.wallet.import_address(&Address::p2wsh(
480            &accepted_contract.dlc_transactions.funding_script_pubkey,
481            self.blockchain.get_network()?,
482        ))?;
483
484        let contract_id = accepted_contract.get_contract_id();
485
486        self.store
487            .update_contract(&Contract::Accepted(accepted_contract))
488            .await?;
489
490        log_info!(
491            self.logger,
492            "Accepted and stored the contract. temp_id={} contract_id={}",
493            offered_contract.id.to_lower_hex_string(),
494            contract_id.to_lower_hex_string(),
495        );
496
497        Ok((contract_id, counter_party, accept_msg))
498    }
499
500    /// Function to update the state of the [`ChainMonitor`] with new
501    /// blocks.
502    ///
503    /// Consumers **MUST** call this periodically in order to
504    /// determine when pending transactions reach confirmation.
505    #[tracing::instrument(skip_all)]
506    pub async fn periodic_chain_monitor(&self) -> Result<(), Error> {
507        let cur_height = self.blockchain.get_blockchain_height().await?;
508        let last_height = self.chain_monitor.lock().await.last_height;
509
510        // TODO(luckysori): We could end up reprocessing a block at
511        // the same height if there is a reorg.
512        if cur_height < last_height {
513            return Err(Error::InvalidState(
514                "Current height is lower than last height.".to_string(),
515            ));
516        }
517
518        for height in last_height + 1..=cur_height {
519            let block = self.blockchain.get_block_at_height(height).await?;
520
521            self.chain_monitor
522                .lock()
523                .await
524                .process_block(&block, height);
525        }
526
527        Ok(())
528    }
529
530    /// Function to call to check the state of the currently executing DLCs and
531    /// update them if possible.
532    #[tracing::instrument(skip_all, level = "debug")]
533    pub async fn periodic_check(&self, check_channels: bool) -> Result<(), Error> {
534        let signed_contracts = self.check_for_spliced_contract().await?;
535        self.check_signed_contracts(&signed_contracts).await?;
536        self.check_confirmed_contracts().await?;
537        self.check_preclosed_contracts().await?;
538
539        if check_channels {
540            self.channel_checks().await?;
541        }
542
543        Ok(())
544    }
545
546    /// Function to call to offer a DLC.
547    #[tracing::instrument(skip_all)]
548    pub async fn on_offer_message(
549        &self,
550        offered_message: &OfferDlc,
551        counter_party: PublicKey,
552    ) -> Result<(), Error> {
553        offered_message.validate(&self.secp, REFUND_DELAY, REFUND_DELAY * 2)?;
554        let keys_id = self
555            .signer_provider
556            .derive_signer_key_id(false, offered_message.temporary_contract_id);
557        let contract: OfferedContract =
558            OfferedContract::try_from_offer_dlc(offered_message, counter_party, keys_id)?;
559        contract.validate()?;
560
561        if self.store.get_contract(&contract.id).await?.is_some() {
562            return Err(Error::InvalidParameters(
563                "Contract with identical id already exists".to_string(),
564            ));
565        }
566
567        self.store.create_contract(&contract).await?;
568        log_info!(
569            self.logger,
570            "Created and stored the offered contract. temp_id={}",
571            contract.id.to_lower_hex_string(),
572        );
573        Ok(())
574    }
575
576    /// Function to call to close a DLC.
577    #[tracing::instrument(skip_all)]
578    pub async fn on_close_message(
579        &self,
580        close_msg: &CloseDlc,
581        counter_party: &PublicKey,
582    ) -> Result<(), Error> {
583        // Validate that the contract exists and is in the correct state
584        let signed_contract = get_contract_in_state!(
585            self,
586            &close_msg.contract_id,
587            Confirmed,
588            Some(*counter_party)
589        )?;
590
591        // Validate the close message by attempting to construct the close transaction
592        // This verifies the signature and transaction structure without broadcasting
593        let _close_tx = crate::contract_updater::complete_cooperative_close(
594            &self.secp,
595            &signed_contract,
596            close_msg,
597            &self.signer_provider,
598            &self.logger,
599        )?;
600
601        // Message is valid - the application layer should call accept_cooperative_close()
602        // if they want to accept the offered terms
603        Ok(())
604    }
605
606    /// Function to call to accept a DLC for which an offer was received.
607    #[tracing::instrument(skip_all)]
608    pub async fn on_accept_message(
609        &self,
610        accept_msg: &AcceptDlc,
611        counter_party: &PublicKey,
612    ) -> Result<DlcMessage, Error> {
613        let offered_contract = get_contract_in_state!(
614            self,
615            &accept_msg.temporary_contract_id,
616            Offered,
617            Some(*counter_party)
618        )?;
619
620        let (signed_contract, signed_msg) = match verify_accepted_and_sign_contract(
621            &self.secp,
622            &offered_contract,
623            accept_msg,
624            &self.wallet,
625            &self.signer_provider,
626            &self.store,
627            &self.logger,
628        )
629        .await
630        {
631            Ok(contract) => contract,
632            Err(e) => {
633                log_error!(
634                    self.logger,
635                    "Error in on_accept_message. tmp_contract_id={} error={}",
636                    offered_contract.id.to_lower_hex_string(),
637                    e.to_string()
638                );
639                return self
640                    .accept_fail_on_error(offered_contract, accept_msg.clone(), e)
641                    .await;
642            }
643        };
644
645        log_info!(
646            self.logger,
647            "Verified the accept message and signed the contract. temp_id={} contract_id={}",
648            offered_contract.id.to_lower_hex_string(),
649            signed_contract.accepted_contract.get_contract_id_string(),
650        );
651
652        let contract_id = signed_contract.accepted_contract.get_contract_id_string();
653
654        self.wallet.import_address(&Address::p2wsh(
655            &signed_contract
656                .accepted_contract
657                .dlc_transactions
658                .funding_script_pubkey,
659            self.blockchain.get_network()?,
660        ))?;
661
662        self.store
663            .update_contract(&Contract::Signed(signed_contract))
664            .await?;
665
666        log_info!(
667            self.logger,
668            "Accepted and signed the contract. temp_id={} contract_id={}",
669            offered_contract.id.to_lower_hex_string(),
670            contract_id,
671        );
672
673        Ok(DlcMessage::Sign(signed_msg))
674    }
675
676    /// Function to call to sign a DLC for which an accept was received.
677    #[tracing::instrument(skip_all)]
678    pub async fn on_sign_message(
679        &self,
680        sign_message: &SignDlc,
681        peer_id: &PublicKey,
682    ) -> Result<(), Error> {
683        let accepted_contract =
684            get_contract_in_state!(self, &sign_message.contract_id, Accepted, Some(*peer_id))?;
685        let (signed_contract, fund_tx) = match crate::contract_updater::verify_signed_contract(
686            &self.secp,
687            &accepted_contract,
688            sign_message,
689            &self.wallet,
690            &self.store,
691            &self.signer_provider,
692            &self.logger,
693        )
694        .await
695        {
696            Ok(contract) => contract,
697            Err(e) => {
698                log_error!(
699                    self.logger,
700                    "Error in on_sign_message. contract_id={} error={}",
701                    accepted_contract.get_contract_id_string(),
702                    e.to_string()
703                );
704                return self
705                    .sign_fail_on_error(accepted_contract, sign_message.clone(), e)
706                    .await;
707            }
708        };
709
710        self.store
711            .update_contract(&Contract::Signed(signed_contract.clone()))
712            .await?;
713
714        self.blockchain.send_transaction(&fund_tx).await?;
715
716        // Check if there are any DLC inputs in the funding inputs of the contract.
717        // If there are, mark the contract as pre-closed.
718        if accepted_contract
719            .offered_contract
720            .funding_inputs
721            .iter()
722            .any(|c| c.dlc_input.is_some())
723        {
724            let Some(dlc_input) = accepted_contract
725                .offered_contract
726                .funding_inputs
727                .iter()
728                .find(|c| c.dlc_input.is_some())
729            else {
730                return Ok(());
731            };
732            let preclosed_contract = get_contract_in_state!(
733                self,
734                &dlc_input.dlc_input.as_ref().unwrap().contract_id,
735                Confirmed,
736                None as Option<PublicKey>
737            )?;
738
739            let preclosed_contract = PreClosedContract {
740                signed_contract: preclosed_contract.clone(),
741                attestations: None,
742                signed_cet: signed_contract
743                    .accepted_contract
744                    .dlc_transactions
745                    .fund
746                    .clone(),
747            };
748
749            log_debug!(self.logger,
750                "Contract contains a DLC input. Marking the previous contract as pre-closed. dlc_input_contract_id={}", 
751                preclosed_contract.signed_contract.accepted_contract.get_contract_id_string()
752            );
753
754            self.store
755                .update_contract(&Contract::PreClosed(preclosed_contract.clone()))
756                .await?;
757        }
758
759        Ok(())
760    }
761
762    #[tracing::instrument(skip_all, level = "debug")]
763    async fn get_oracle_announcements(
764        &self,
765        oracle_inputs: &OracleInput,
766    ) -> Result<Vec<OracleAnnouncement>, Error> {
767        let mut announcements = Vec::new();
768        for pubkey in &oracle_inputs.public_keys {
769            let oracle = self
770                .oracles
771                .get(pubkey)
772                .ok_or_else(|| Error::InvalidParameters("Unknown oracle public key".to_string()))?;
773            let announcement = oracle.get_announcement(&oracle_inputs.event_id).await?;
774            announcements.push(announcement);
775        }
776
777        Ok(announcements)
778    }
779
780    async fn sign_fail_on_error<R>(
781        &self,
782        accepted_contract: AcceptedContract,
783        sign_message: SignDlc,
784        e: Error,
785    ) -> Result<R, Error> {
786        log_error!(
787            self.logger,
788            "Error in on_sign_message marking contract as failed sign. contract_id={} error={}",
789            accepted_contract.get_contract_id_string(),
790            e.to_string()
791        );
792        self.store
793            .update_contract(&Contract::FailedSign(FailedSignContract {
794                accepted_contract,
795                sign_message,
796                error_message: e.to_string(),
797            }))
798            .await?;
799        Err(e)
800    }
801
802    async fn accept_fail_on_error<R>(
803        &self,
804        offered_contract: OfferedContract,
805        accept_message: AcceptDlc,
806        e: Error,
807    ) -> Result<R, Error> {
808        log_error!(
809            self.logger,
810            "Error in on_accept_message marking contract as failed accept. contract_id={} error={}",
811            offered_contract.id.to_lower_hex_string(),
812            e.to_string()
813        );
814        self.store
815            .update_contract(&Contract::FailedAccept(FailedAcceptContract {
816                offered_contract,
817                accept_message,
818                error_message: e.to_string(),
819            }))
820            .await?;
821        Err(e)
822    }
823
824    #[tracing::instrument(skip_all, level = "debug")]
825    async fn check_signed_contract(&self, contract: &SignedContract) -> Result<(), Error> {
826        let confirmations = self
827            .blockchain
828            .get_transaction_confirmations(
829                &contract
830                    .accepted_contract
831                    .dlc_transactions
832                    .fund
833                    .compute_txid(),
834            )
835            .await?;
836        if confirmations >= *NB_CONFIRMATIONS {
837            log_info!(
838                self.logger,
839                "Marking signed contract as confirmed. confirmations={} contract_id={}",
840                confirmations,
841                contract.accepted_contract.get_contract_id_string(),
842            );
843            self.store
844                .update_contract(&Contract::Confirmed(contract.clone()))
845                .await?;
846        } else {
847            log_debug!(self.logger,
848                "Not enough confirmations to mark contract as confirmed. confirmations={} required={} contract_id={}", 
849                confirmations,
850                *NB_CONFIRMATIONS,
851                contract.accepted_contract.get_contract_id_string(),
852            );
853        }
854        Ok(())
855    }
856
857    #[tracing::instrument(skip_all, level = "debug")]
858    async fn check_signed_contracts(
859        &self,
860        signed_contracts: &[SignedContract],
861    ) -> Result<(), Error> {
862        for c in signed_contracts {
863            if let Err(e) = self.check_signed_contract(c).await {
864                log_error!(
865                    self.logger,
866                    "Error checking signed contract. contract_id={} error={}",
867                    c.accepted_contract.get_contract_id_string(),
868                    e.to_string()
869                )
870            }
871        }
872
873        Ok(())
874    }
875
876    #[tracing::instrument(skip_all, level = "debug")]
877    async fn check_for_spliced_contract(&self) -> Result<Vec<SignedContract>, Error> {
878        let contracts = self.get_store().get_signed_contracts().await?;
879        for contract in &contracts {
880            let dlc_input = contract
881                .accepted_contract
882                .offered_contract
883                .funding_inputs
884                .iter()
885                .find(|d| d.dlc_input.is_some());
886
887            let Some(funding_input) = dlc_input else {
888                continue;
889            };
890
891            let contract_id = funding_input.dlc_input.as_ref().unwrap().contract_id;
892
893            let confirmed_contract_in_splice = match get_contract_in_state!(
894                self,
895                &contract_id,
896                Confirmed,
897                None as Option<PublicKey>
898            ) {
899                Ok(c) => Ok(c),
900                Err(Error::InvalidState(e)) => {
901                    log_trace!(self.logger,
902                        "The previouse contract referenced in a splice transaction is in an unexpected state. contract_id={} error={}", 
903                        contract_id.to_lower_hex_string(), e.to_string(),
904                    );
905                    continue;
906                }
907                Err(e) => {
908                    log_debug!(self.logger,
909                        "The previouse contract referenced in a splice transaction failed to retrieve. contract_id={} error={}", 
910                        contract_id.to_lower_hex_string(), e.to_string(),
911                    );
912                    Err(e)
913                }
914            }?;
915
916            let preclosed_contract = PreClosedContract {
917                signed_contract: confirmed_contract_in_splice.clone(),
918                attestations: None,
919                signed_cet: contract.accepted_contract.dlc_transactions.fund.clone(),
920            };
921
922            log_debug!(self.logger,
923                "The previous contract that was spliced is now closed because the splice contract is confirmed. contract_id={}", 
924                contract_id.to_lower_hex_string(),
925            );
926
927            self.store
928                .update_contract(&Contract::PreClosed(preclosed_contract.clone()))
929                .await?;
930        }
931        Ok(contracts)
932    }
933
934    #[tracing::instrument(skip_all, level = "debug")]
935    async fn check_confirmed_contracts(&self) -> Result<(), Error> {
936        for c in self.store.get_confirmed_contracts().await? {
937            // Confirmed contracts from channel are processed in channel specific methods.
938            if c.channel_id.is_some() {
939                continue;
940            }
941            if let Err(e) = self.check_confirmed_contract(&c).await {
942                log_error!(
943                    self.logger,
944                    "Error checking confirmed contract. contract_id={} error={}",
945                    c.accepted_contract.get_contract_id_string(),
946                    e.to_string()
947                )
948            }
949        }
950
951        Ok(())
952    }
953
954    #[tracing::instrument(skip_all, level = "debug")]
955    async fn get_closable_contract_info<'a>(
956        &'a self,
957        contract: &'a SignedContract,
958    ) -> ClosableContractInfo<'a> {
959        let contract_infos = &contract.accepted_contract.offered_contract.contract_info;
960        let adaptor_infos = &contract.accepted_contract.adaptor_infos;
961        for (contract_info, adaptor_info) in contract_infos.iter().zip(adaptor_infos.iter()) {
962            log_debug!(
963                self.logger,
964                "Checking contract for oracle maturation. contract_id={}",
965                contract.accepted_contract.get_contract_id_string()
966            );
967            let matured: Vec<_> = contract_info
968                .oracle_announcements
969                .iter()
970                .filter(|x| {
971                    (x.oracle_event.event_maturity_epoch as u64) <= self.time.unix_time_now()
972                })
973                .enumerate()
974                .collect();
975            if matured.len() >= contract_info.threshold {
976                let attestations = stream::iter(matured.iter())
977                    .map(|(i, announcement)| async move {
978                        log_debug!(self.logger,
979                            "Oracle announcement for contract is matured. Getting attestations. contract_id={} event_id={}", 
980                            contract.accepted_contract.get_contract_id_string(),
981                            announcement.oracle_event.event_id
982                        );
983                        // First try to get the oracle
984                        let oracle = match self.oracles.get(&announcement.oracle_public_key) {
985                            Some(oracle) => oracle,
986                            None => {
987                                log_debug!(self.logger,
988                                    "Oracle not found. pubkey={}. contract_id={} event_id={}", 
989                                    announcement.oracle_public_key,
990                                    contract.accepted_contract.get_contract_id_string(),
991                                    announcement.oracle_event.event_id
992                                );
993                                return None;
994                            }
995                        };
996                        // Then try to get the attestation
997                        let attestation = match oracle
998                            .get_attestation(&announcement.oracle_event.event_id)
999                            .await
1000                        {
1001                            Ok(attestation) => attestation,
1002                            Err(_) => {
1003                                // log_error!(self.logger,
1004                                //     "Attestation not found for event. pubkey={} event_id={} error={}",
1005                                //     announcement.oracle_public_key,
1006                                //     announcement.oracle_event.event_id,
1007                                //     e.to_string()
1008                                // );
1009                                return None;
1010                            }
1011                        };
1012                        // Validate the attestation
1013                        if let Err(e) = attestation.validate(&self.secp, announcement) {
1014                            log_error!(self.logger,
1015                                "Oracle attestation is not valid. pubkey={} event_id={}, error={}",
1016                                announcement.oracle_public_key,
1017                                announcement.oracle_event.event_id,
1018                                e.to_string()
1019                            );
1020                            return None;
1021                        }
1022                        log_info!(self.logger,
1023                            "Retrieved a valid attestation. pubkey={} event_id={} outcomes={:?}", 
1024                            announcement.oracle_public_key,
1025                            announcement.oracle_event.event_id,
1026                            attestation.outcomes
1027                        );
1028                        Some((*i, attestation))
1029                    })
1030                    .collect::<FuturesUnordered<_>>()
1031                    .await
1032                    .filter_map(|result| async move { result }) // Filter out None values
1033                    .collect::<Vec<_>>()
1034                    .await;
1035                if attestations.len() >= contract_info.threshold {
1036                    log_info!(self.logger,
1037                        "Found enough attestations to close contract. contract_id={} attestations={}", 
1038                        contract.accepted_contract.get_contract_id_string(),
1039                        attestations.len()
1040                    );
1041                    return Some((contract_info, adaptor_info, attestations));
1042                }
1043            }
1044        }
1045        None
1046    }
1047
1048    #[tracing::instrument(skip_all, level = "debug")]
1049    async fn check_confirmed_contract(&self, contract: &SignedContract) -> Result<(), Error> {
1050        // log_debug!(
1051        //     self.logger,
1052        //     "Checking confirmed contract. contract_id={}",
1053        //     contract.accepted_contract.get_contract_id_string()
1054        // );
1055        let closable_contract_info = self.get_closable_contract_info(contract).await;
1056        if let Some((contract_info, adaptor_info, attestations)) = closable_contract_info {
1057            log_debug!(
1058                self.logger,
1059                "Found closable contract info. contract_id={} attestations={}",
1060                contract.accepted_contract.get_contract_id_string(),
1061                attestations.len()
1062            );
1063            let offer = &contract.accepted_contract.offered_contract;
1064            let signer = self.signer_provider.derive_contract_signer(offer.keys_id)?;
1065
1066            //  === WARNING ===
1067            // This code could potentially be problematic. When running refund tests, it would look for a CET
1068            // but the CET would be invalid and refund would not pass. By only updating with a valid CET,
1069            // we then go to update. This way if it fails we can check for refund instead of bailing and getting locked
1070            // funds.
1071            if let Ok(cet) = crate::contract_updater::get_signed_cet(
1072                &self.secp,
1073                contract,
1074                contract_info,
1075                adaptor_info,
1076                &attestations,
1077                &signer,
1078                &self.logger,
1079            ) {
1080                log_info!(
1081                    self.logger,
1082                    "Found valid CET. Closing contract. contract_id={}",
1083                    contract.accepted_contract.get_contract_id_string()
1084                );
1085                match self
1086                    .close_contract(
1087                        contract,
1088                        cet,
1089                        attestations.iter().map(|x| x.1.clone()).collect(),
1090                    )
1091                    .await
1092                {
1093                    Ok(closed_contract) => {
1094                        log_info!(
1095                            self.logger,
1096                            "Updated contract to closed. contract_id={}",
1097                            contract.accepted_contract.get_contract_id_string()
1098                        );
1099                        self.store.update_contract(&closed_contract).await?;
1100                        return Ok(());
1101                    }
1102                    Err(e) => {
1103                        log_warn!(
1104                            self.logger,
1105                            "Failed to close contract. contract_id={} error={}",
1106                            contract.accepted_contract.get_contract_id_string(),
1107                            e.to_string()
1108                        );
1109                        return Err(e);
1110                    }
1111                }
1112            }
1113        }
1114
1115        // Check each pending close transaction
1116        for pending_close_tx in &contract
1117            .accepted_contract
1118            .dlc_transactions
1119            .pending_close_txs
1120        {
1121            let confirmations = self
1122                .blockchain
1123                .get_transaction_confirmations(&pending_close_tx.compute_txid())
1124                .await?;
1125
1126            log_debug!(
1127                self.logger,
1128                "Checking pending close transaction. close_txid={} contract_id={} confirmations={}",
1129                pending_close_tx.compute_txid().to_string(),
1130                contract.accepted_contract.get_contract_id_string(),
1131                confirmations
1132            );
1133
1134            if confirmations >= *NB_CONFIRMATIONS {
1135                // Found a fully confirmed pending close - move directly to Closed
1136                log_info!(self.logger,
1137                    "Pending close transaction is fully confirmed. Moving to closed. close_txid={} contract_id={}", 
1138                    pending_close_tx.compute_txid().to_string(),
1139                    contract.accepted_contract.get_contract_id_string()
1140                );
1141                let pnl = contract.accepted_contract.compute_pnl(pending_close_tx);
1142                let closed_contract = ClosedContract {
1143                    attestations: None, // Cooperative close has no attestations
1144                    signed_cet: Some(pending_close_tx.clone()), // Cooperative close doesn't use a CET
1145                    contract_id: contract.accepted_contract.get_contract_id(),
1146                    temporary_contract_id: contract.accepted_contract.offered_contract.id,
1147                    counter_party_id: contract.accepted_contract.offered_contract.counter_party,
1148                    pnl,
1149                    funding_txid: contract
1150                        .accepted_contract
1151                        .dlc_transactions
1152                        .fund
1153                        .compute_txid(),
1154                    signed_contract: contract.clone(),
1155                };
1156
1157                self.store
1158                    .update_contract(&Contract::Closed(closed_contract))
1159                    .await?;
1160                break; // Only one close can be confirmed
1161            } else if confirmations >= 1 {
1162                // Found a confirmed but not fully confirmed pending close - move to PreClosed
1163                log_debug!(self.logger,
1164                    "Found a confirmed but not fully confirmed pending close. Moving to preclosed. close_txid={} contract_id={}", 
1165                    pending_close_tx.compute_txid().to_string(),
1166                    contract.accepted_contract.get_contract_id_string()
1167                );
1168                let preclosed_contract = PreClosedContract {
1169                    signed_contract: contract.clone(),
1170                    attestations: None, // Cooperative close has no attestations
1171                    signed_cet: pending_close_tx.clone(),
1172                };
1173
1174                self.store
1175                    .update_contract(&Contract::PreClosed(preclosed_contract))
1176                    .await?;
1177                break; // Only one close can be confirmed
1178            }
1179        }
1180
1181        self.check_refund(contract).await?;
1182
1183        Ok(())
1184    }
1185
1186    /// Manually close a contract with the oracle attestations.
1187    #[tracing::instrument(skip_all, level = "debug")]
1188    pub async fn close_confirmed_contract(
1189        &self,
1190        contract_id: &ContractId,
1191        attestations: Vec<(usize, OracleAttestation)>,
1192    ) -> Result<Contract, Error> {
1193        log_info!(
1194            self.logger,
1195            "Attempting to close confirmed contract manually. contract_id={} outcomes={:?}",
1196            contract_id.to_lower_hex_string(),
1197            attestations
1198                .iter()
1199                .map(|(_, a)| a.outcomes.clone())
1200                .collect::<Vec<_>>()
1201        );
1202        let contract = get_contract_in_state!(self, contract_id, Confirmed, None::<PublicKey>)?;
1203        let contract_infos = &contract.accepted_contract.offered_contract.contract_info;
1204        let adaptor_infos = &contract.accepted_contract.adaptor_infos;
1205
1206        // find the contract info that matches the attestations
1207        if let Some((contract_info, adaptor_info)) =
1208            contract_infos.iter().zip(adaptor_infos).find(|(c, _)| {
1209                let matches = attestations
1210                    .iter()
1211                    .filter(|(i, a)| {
1212                        c.oracle_announcements[*i].oracle_event.oracle_nonces == a.nonces()
1213                    })
1214                    .count();
1215
1216                matches >= c.threshold
1217            })
1218        {
1219            let offer = &contract.accepted_contract.offered_contract;
1220            let signer = self.signer_provider.derive_contract_signer(offer.keys_id)?;
1221            log_debug!(
1222                self.logger,
1223                "Getting signed CET. contract_id={}",
1224                contract.accepted_contract.get_contract_id_string()
1225            );
1226            let cet = crate::contract_updater::get_signed_cet(
1227                &self.secp,
1228                &contract,
1229                contract_info,
1230                adaptor_info,
1231                &attestations,
1232                &signer,
1233                &self.logger,
1234            )?;
1235
1236            // Check that the lock time has passed
1237            let time = bitcoin::absolute::Time::from_consensus(self.time.unix_time_now() as u32)
1238                .expect("Time is not in valid range. This should never happen.");
1239            let height =
1240                Height::from_consensus(self.blockchain.get_blockchain_height().await? as u32)
1241                    .expect("Height is not in valid range. This should never happen.");
1242            let locktime = cet.lock_time;
1243
1244            if !locktime.is_satisfied_by(height, time) {
1245                return Err(Error::InvalidState(
1246                    "CET lock time has not passed yet".to_string(),
1247                ));
1248            }
1249
1250            match self
1251                .close_contract(
1252                    &contract,
1253                    cet,
1254                    attestations.into_iter().map(|x| x.1).collect(),
1255                )
1256                .await
1257            {
1258                Ok(closed_contract) => {
1259                    log_info!(
1260                        self.logger,
1261                        "Closed contract manually. contract_id={}",
1262                        contract.accepted_contract.get_contract_id_string()
1263                    );
1264                    self.store.update_contract(&closed_contract).await?;
1265                    Ok(closed_contract)
1266                }
1267                Err(e) => {
1268                    log_error!(
1269                        self.logger,
1270                        "Failed to close contract. contract_id={} error={}",
1271                        contract.accepted_contract.get_contract_id_string(),
1272                        e.to_string()
1273                    );
1274                    Err(e)
1275                }
1276            }
1277        } else {
1278            Err(Error::InvalidState(
1279                "Attestations did not match contract infos".to_string(),
1280            ))
1281        }
1282    }
1283
1284    #[tracing::instrument(skip_all, level = "debug")]
1285    async fn check_preclosed_contracts(&self) -> Result<(), Error> {
1286        for c in self.store.get_preclosed_contracts().await? {
1287            if let Err(e) = self.check_preclosed_contract(&c).await {
1288                log_error!(
1289                    self.logger,
1290                    "Error checking pre-closed contract. contract_id={} error={}",
1291                    c.signed_contract.accepted_contract.get_contract_id_string(),
1292                    e
1293                )
1294            }
1295        }
1296
1297        Ok(())
1298    }
1299
1300    #[tracing::instrument(skip_all, level = "debug")]
1301    async fn check_preclosed_contract(&self, contract: &PreClosedContract) -> Result<(), Error> {
1302        let broadcasted_txid = contract.signed_cet.compute_txid();
1303        let confirmations = self
1304            .blockchain
1305            .get_transaction_confirmations(&broadcasted_txid)
1306            .await?;
1307        log_debug!(
1308            self.logger,
1309            "Checking pre-closed contract. broadcasted_txid={} contract_id={} confirmations={}",
1310            broadcasted_txid.to_string(),
1311            contract
1312                .signed_contract
1313                .accepted_contract
1314                .get_contract_id_string(),
1315            confirmations
1316        );
1317        if confirmations >= *NB_CONFIRMATIONS {
1318            log_debug!(self.logger,
1319                "Pre-closed contract is fully confirmed. Moving to closed. broadcasted_txid={} contract_id={}", 
1320                broadcasted_txid.to_string(),
1321                contract.signed_contract.accepted_contract.get_contract_id_string()
1322            );
1323            let pnl = contract
1324                .signed_contract
1325                .accepted_contract
1326                .compute_pnl(&contract.signed_cet);
1327
1328            let closed_contract = ClosedContract {
1329                attestations: contract.attestations.clone(),
1330                signed_cet: Some(contract.signed_cet.clone()),
1331                contract_id: contract.signed_contract.accepted_contract.get_contract_id(),
1332                temporary_contract_id: contract
1333                    .signed_contract
1334                    .accepted_contract
1335                    .offered_contract
1336                    .id,
1337                counter_party_id: contract
1338                    .signed_contract
1339                    .accepted_contract
1340                    .offered_contract
1341                    .counter_party,
1342                funding_txid: contract
1343                    .signed_contract
1344                    .accepted_contract
1345                    .dlc_transactions
1346                    .fund
1347                    .compute_txid(),
1348                pnl,
1349                signed_contract: contract.signed_contract.clone(),
1350            };
1351            self.store
1352                .update_contract(&Contract::Closed(closed_contract))
1353                .await?;
1354        }
1355
1356        Ok(())
1357    }
1358
1359    #[tracing::instrument(skip_all, level = "debug")]
1360    async fn close_contract(
1361        &self,
1362        contract: &SignedContract,
1363        signed_cet: Transaction,
1364        attestations: Vec<OracleAttestation>,
1365    ) -> Result<Contract, Error> {
1366        let confirmations = self
1367            .blockchain
1368            .get_transaction_confirmations(&signed_cet.compute_txid())
1369            .await?;
1370
1371        if confirmations < 1 {
1372            log_info!(
1373                self.logger,
1374                "Broadcasting signed CET. txid={} contract_id={}",
1375                signed_cet.compute_txid().to_string(),
1376                contract.accepted_contract.get_contract_id_string()
1377            );
1378            // TODO(tibo): if this fails because another tx is already in
1379            // mempool or blockchain, we might have been cheated. There is
1380            // not much to be done apart from possibly extracting a fraud
1381            // proof but ideally it should be handled.
1382            self.blockchain.send_transaction(&signed_cet).await?;
1383
1384            let preclosed_contract = PreClosedContract {
1385                signed_contract: contract.clone(),
1386                attestations: Some(attestations),
1387                signed_cet,
1388            };
1389
1390            return Ok(Contract::PreClosed(preclosed_contract));
1391        } else if confirmations < *NB_CONFIRMATIONS {
1392            log_debug!(self.logger,
1393                "Found a confirmed but not fully confirmed pending close. Moving to preclosed. txid={} contract_id={}", 
1394                signed_cet.compute_txid().to_string(),
1395                contract.accepted_contract.get_contract_id_string()
1396            );
1397            let preclosed_contract = PreClosedContract {
1398                signed_contract: contract.clone(),
1399                attestations: Some(attestations),
1400                signed_cet,
1401            };
1402
1403            return Ok(Contract::PreClosed(preclosed_contract));
1404        }
1405
1406        let closed_contract = ClosedContract {
1407            attestations: Some(attestations.to_vec()),
1408            pnl: contract.accepted_contract.compute_pnl(&signed_cet),
1409            signed_cet: Some(signed_cet),
1410            contract_id: contract.accepted_contract.get_contract_id(),
1411            temporary_contract_id: contract.accepted_contract.offered_contract.id,
1412            funding_txid: contract
1413                .accepted_contract
1414                .dlc_transactions
1415                .fund
1416                .compute_txid(),
1417            counter_party_id: contract.accepted_contract.offered_contract.counter_party,
1418            signed_contract: contract.clone(),
1419        };
1420
1421        Ok(Contract::Closed(closed_contract))
1422    }
1423
1424    /// Check if the refund locktime has passed and broadcast the refund transaction if it has.
1425    #[tracing::instrument(skip_all, level = "debug")]
1426    pub async fn check_and_broadcast_refund(
1427        &self,
1428        contract_id: &ContractId,
1429    ) -> Result<Contract, Error> {
1430        // Assert the contract is confirmed
1431        let contract = get_contract_in_state!(self, contract_id, Confirmed, None::<PublicKey>)?;
1432
1433        if contract
1434            .accepted_contract
1435            .dlc_transactions
1436            .refund
1437            .lock_time
1438            .to_consensus_u32() as u64
1439            <= self.time.unix_time_now()
1440        {
1441            log_debug!(self.logger,
1442                "Refund locktime has passed. Attempting to broadcast refund. refund_txid={} contract_id={}", 
1443                contract.accepted_contract.dlc_transactions.refund.compute_txid().to_string(),
1444                contract.accepted_contract.get_contract_id_string()
1445            );
1446            let accepted_contract = &contract.accepted_contract;
1447            let refund = accepted_contract.dlc_transactions.refund.clone();
1448            let confirmations = self
1449                .blockchain
1450                .get_transaction_confirmations(&refund.compute_txid())
1451                .await?;
1452            if confirmations == 0 {
1453                log_debug!(self.logger,
1454                    "Refund transaction has not been broadcast yet. Sending transaction txid={} contract_id={}", 
1455                    refund.compute_txid().to_string(),
1456                    contract.accepted_contract.get_contract_id_string()
1457                );
1458                let offer = &contract.accepted_contract.offered_contract;
1459                let signer = self.signer_provider.derive_contract_signer(offer.keys_id)?;
1460                let refund = crate::contract_updater::get_signed_refund(
1461                    &self.secp,
1462                    &contract,
1463                    &signer,
1464                    &self.logger,
1465                )?;
1466                self.blockchain.send_transaction(&refund).await?;
1467            }
1468
1469            let refunded = Contract::Refunded(contract.clone());
1470            self.store.update_contract(&refunded).await?;
1471            Ok(refunded)
1472        } else {
1473            return Err(Error::InvalidParameters(
1474                "Contract maturity has not passed to broadcast refund".to_string(),
1475            ));
1476        }
1477    }
1478
1479    #[tracing::instrument(skip_all, level = "debug")]
1480    async fn check_refund(&self, contract: &SignedContract) -> Result<(), Error> {
1481        if contract
1482            .accepted_contract
1483            .dlc_transactions
1484            .refund
1485            .lock_time
1486            .to_consensus_u32() as u64
1487            <= self.time.unix_time_now()
1488        {
1489            let refund_txid = contract
1490                .accepted_contract
1491                .dlc_transactions
1492                .refund
1493                .compute_txid();
1494            let confirmations = self
1495                .blockchain
1496                .get_transaction_confirmations(&refund_txid)
1497                .await?;
1498
1499            if confirmations > 0 {
1500                // Counterparty (or we) already broadcast the refund tx. Update state.
1501                self.store
1502                    .update_contract(&Contract::Refunded(contract.clone()))
1503                    .await?;
1504            } else if *AUTOMATIC_REFUND {
1505                self.check_and_broadcast_refund(&contract.accepted_contract.get_contract_id())
1506                    .await?;
1507            }
1508        }
1509
1510        Ok(())
1511    }
1512
1513    /// Function to call when we detect that a contract was closed by our counter party.
1514    /// This will update the state of the contract and return the [`Contract`] object.
1515    #[tracing::instrument(skip_all, level = "debug")]
1516    pub async fn on_counterparty_close(
1517        &mut self,
1518        contract: &SignedContract,
1519        closing_tx: Transaction,
1520        confirmations: u32,
1521    ) -> Result<Contract, Error> {
1522        // check if the closing tx actually spends the funding output
1523        if !closing_tx.input.iter().any(|i| {
1524            i.previous_output
1525                == contract
1526                    .accepted_contract
1527                    .dlc_transactions
1528                    .get_fund_outpoint()
1529        }) {
1530            log_error!(
1531                self.logger,
1532                "Closing tx does not spend the funding tx. txid={} contract_id={}",
1533                closing_tx.compute_txid().to_string(),
1534                contract.accepted_contract.get_contract_id_string()
1535            );
1536            return Err(Error::InvalidParameters(
1537                "Closing tx does not spend the funding tx".to_string(),
1538            ));
1539        }
1540
1541        // check if it is the refund tx (easy case)
1542        if contract
1543            .accepted_contract
1544            .dlc_transactions
1545            .refund
1546            .compute_txid()
1547            == closing_tx.compute_txid()
1548        {
1549            log_debug!(
1550                self.logger,
1551                "Closing tx is the refund tx. Moving to refunded. txid={} contract_id={}",
1552                closing_tx.compute_txid().to_string(),
1553                contract.accepted_contract.get_contract_id_string()
1554            );
1555            let refunded = Contract::Refunded(contract.clone());
1556            self.store.update_contract(&refunded).await?;
1557            return Ok(refunded);
1558        }
1559
1560        let contract = if confirmations < *NB_CONFIRMATIONS {
1561            log_info!(self.logger,
1562                "Closing transaction is not fully confirmed. Moving to preclosed. txid={} contract_id={}", 
1563                closing_tx.compute_txid().to_string(),
1564                contract.accepted_contract.get_contract_id_string()
1565            );
1566            Contract::PreClosed(PreClosedContract {
1567                signed_contract: contract.clone(),
1568                attestations: None, // todo in some cases we can get the attestations from the closing tx
1569                signed_cet: closing_tx,
1570            })
1571        } else {
1572            log_info!(
1573                self.logger,
1574                "Closing transaction is fully confirmed. Moving to closed. txid={} contract_id={}",
1575                closing_tx.compute_txid().to_string(),
1576                contract.accepted_contract.get_contract_id_string()
1577            );
1578            Contract::Closed(ClosedContract {
1579                attestations: None, // todo in some cases we can get the attestations from the closing tx
1580                pnl: contract.accepted_contract.compute_pnl(&closing_tx),
1581                signed_cet: Some(closing_tx),
1582                contract_id: contract.accepted_contract.get_contract_id(),
1583                temporary_contract_id: contract.accepted_contract.offered_contract.id,
1584                counter_party_id: contract.accepted_contract.offered_contract.counter_party,
1585                funding_txid: contract
1586                    .accepted_contract
1587                    .dlc_transactions
1588                    .fund
1589                    .compute_txid(),
1590                signed_contract: contract.clone(),
1591            })
1592        };
1593
1594        self.store.update_contract(&contract).await?;
1595
1596        Ok(contract)
1597    }
1598
1599    /// Initiates a cooperative close of a contract by creating and signing a closing transaction.
1600    /// Returns a CloseDlc message to be sent to the counter party.
1601    /// The contract remains in Confirmed state until the close transaction is broadcast.
1602    #[tracing::instrument(skip_all, level = "debug")]
1603    pub async fn cooperative_close_contract(
1604        &self,
1605        contract_id: &ContractId,
1606        counter_payout: Amount,
1607    ) -> Result<(CloseDlc, PublicKey), Error> {
1608        let signed_contract =
1609            get_contract_in_state!(self, contract_id, Confirmed, None as Option<PublicKey>)?;
1610
1611        let (close_message, close_tx) = crate::contract_updater::create_cooperative_close(
1612            &self.secp,
1613            &signed_contract,
1614            counter_payout,
1615            &self.signer_provider,
1616            &self.logger,
1617        )?;
1618
1619        // Create updated contract with pending close transaction
1620        let mut updated_dlc_transactions =
1621            signed_contract.accepted_contract.dlc_transactions.clone();
1622        updated_dlc_transactions
1623            .pending_close_txs
1624            .push(close_tx.clone());
1625        log_debug!(
1626            self.logger,
1627            "Created updated contract with pending close transaction. close_txid={} contract_id={}",
1628            close_tx.compute_txid().to_string(),
1629            contract_id.to_lower_hex_string()
1630        );
1631
1632        let updated_accepted_contract = AcceptedContract {
1633            dlc_transactions: updated_dlc_transactions,
1634            ..signed_contract.accepted_contract.clone()
1635        };
1636
1637        let updated_signed_contract = SignedContract {
1638            accepted_contract: updated_accepted_contract,
1639            ..signed_contract.clone()
1640        };
1641
1642        // Update contract state to track pending close
1643        self.store
1644            .update_contract(&Contract::Confirmed(updated_signed_contract))
1645            .await?;
1646
1647        let counter_party = signed_contract
1648            .accepted_contract
1649            .offered_contract
1650            .counter_party;
1651
1652        Ok((close_message, counter_party))
1653    }
1654
1655    /// Accepts a cooperative close request by completing the close transaction
1656    /// and broadcasting it to the network.
1657    #[tracing::instrument(skip_all, level = "debug")]
1658    pub async fn accept_cooperative_close(
1659        &self,
1660        contract_id: &ContractId,
1661        close_message: &CloseDlc,
1662    ) -> Result<(), Error> {
1663        let signed_contract =
1664            get_contract_in_state!(self, contract_id, Confirmed, None as Option<PublicKey>)?;
1665
1666        let close_tx = crate::contract_updater::complete_cooperative_close(
1667            &self.secp,
1668            &signed_contract,
1669            close_message,
1670            &self.signer_provider,
1671            &self.logger,
1672        )?;
1673
1674        log_debug!(
1675            self.logger,
1676            "Completed cooperative close. close_txid={} contract_id={}",
1677            close_tx.compute_txid().to_string(),
1678            contract_id.to_lower_hex_string()
1679        );
1680        // Broadcast the closing transaction
1681        self.blockchain.send_transaction(&close_tx).await?;
1682
1683        // Create PreClosed contract (transaction broadcast but not confirmed yet)
1684        let preclosed_contract = PreClosedContract {
1685            signed_contract,
1686            attestations: None,
1687            signed_cet: close_tx,
1688        };
1689
1690        self.store
1691            .update_contract(&Contract::PreClosed(preclosed_contract))
1692            .await?;
1693
1694        Ok(())
1695    }
1696}
1697
1698impl<
1699        W: Deref,
1700        SP: Deref,
1701        B: Deref,
1702        S: Deref,
1703        O: Deref,
1704        T: Deref,
1705        F: Deref,
1706        X: ContractSigner,
1707        L: Deref,
1708    > Manager<W, Arc<CachedContractSignerProvider<SP, X>>, B, S, O, T, F, X, L>
1709where
1710    W::Target: Wallet,
1711    SP::Target: ContractSignerProvider<Signer = X>,
1712    B::Target: Blockchain,
1713    S::Target: Storage,
1714    O::Target: Oracle,
1715    T::Target: Time,
1716    F::Target: FeeEstimator,
1717    L::Target: Logger,
1718{
1719    /// Create a new channel offer and return the [`dlc_messages::channel::OfferChannel`]
1720    /// message to be sent to the `counter_party`.
1721    pub async fn offer_channel(
1722        &self,
1723        contract_input: &ContractInput,
1724        counter_party: PublicKey,
1725    ) -> Result<OfferChannel, Error> {
1726        let oracle_announcements = self.oracle_announcements(contract_input).await?;
1727
1728        let (offered_channel, offered_contract) = crate::channel_updater::offer_channel(
1729            &self.secp,
1730            contract_input,
1731            &counter_party,
1732            &oracle_announcements,
1733            CET_NSEQUENCE,
1734            REFUND_DELAY,
1735            &self.wallet,
1736            &self.signer_provider,
1737            &self.blockchain,
1738            &self.time,
1739            crate::utils::get_new_temporary_id(),
1740        )
1741        .await?;
1742
1743        let msg = offered_channel.get_offer_channel_msg(&offered_contract);
1744
1745        self.store
1746            .upsert_channel(
1747                Channel::Offered(offered_channel),
1748                Some(Contract::Offered(offered_contract)),
1749            )
1750            .await?;
1751
1752        Ok(msg)
1753    }
1754
1755    /// Reject a channel that was offered. Returns the [`dlc_messages::channel::Reject`]
1756    /// message to be sent as well as the public key of the offering node.
1757    pub async fn reject_channel(
1758        &self,
1759        channel_id: &ChannelId,
1760    ) -> Result<(Reject, PublicKey), Error> {
1761        let offered_channel =
1762            get_channel_in_state!(self, channel_id, Offered, None as Option<PublicKey>)?;
1763
1764        if offered_channel.is_offer_party {
1765            return Err(Error::InvalidState(
1766                "Cannot reject channel initiated by us.".to_string(),
1767            ));
1768        }
1769
1770        let offered_contract = get_contract_in_state!(
1771            self,
1772            &offered_channel.offered_contract_id,
1773            Offered,
1774            None as Option<PublicKey>
1775        )?;
1776
1777        let counterparty = offered_channel.counter_party;
1778        self.store
1779            .upsert_channel(
1780                Channel::Cancelled(offered_channel),
1781                Some(Contract::Rejected(offered_contract)),
1782            )
1783            .await?;
1784
1785        let msg = Reject {
1786            channel_id: *channel_id,
1787        };
1788        Ok((msg, counterparty))
1789    }
1790
1791    /// Accept a channel that was offered. Returns the [`dlc_messages::channel::AcceptChannel`]
1792    /// message to be sent, the updated [`crate::ChannelId`] and [`crate::ContractId`],
1793    /// as well as the public key of the offering node.
1794    pub async fn accept_channel(
1795        &self,
1796        channel_id: &ChannelId,
1797    ) -> Result<(AcceptChannel, ChannelId, ContractId, PublicKey), Error> {
1798        let offered_channel =
1799            get_channel_in_state!(self, channel_id, Offered, None as Option<PublicKey>)?;
1800
1801        if offered_channel.is_offer_party {
1802            return Err(Error::InvalidState(
1803                "Cannot accept channel initiated by us.".to_string(),
1804            ));
1805        }
1806
1807        let offered_contract = get_contract_in_state!(
1808            self,
1809            &offered_channel.offered_contract_id,
1810            Offered,
1811            None as Option<PublicKey>
1812        )?;
1813
1814        let (accepted_channel, accepted_contract, accept_channel) =
1815            crate::channel_updater::accept_channel_offer(
1816                &self.secp,
1817                &offered_channel,
1818                &offered_contract,
1819                &self.wallet,
1820                &self.signer_provider,
1821                &self.blockchain,
1822            )
1823            .await?;
1824
1825        self.wallet.import_address(&Address::p2wsh(
1826            &accepted_contract.dlc_transactions.funding_script_pubkey,
1827            self.blockchain.get_network()?,
1828        ))?;
1829
1830        let channel_id = accepted_channel.channel_id;
1831        let contract_id = accepted_contract.get_contract_id();
1832        let counter_party = accepted_contract.offered_contract.counter_party;
1833
1834        self.store
1835            .upsert_channel(
1836                Channel::Accepted(accepted_channel),
1837                Some(Contract::Accepted(accepted_contract)),
1838            )
1839            .await?;
1840
1841        Ok((accept_channel, channel_id, contract_id, counter_party))
1842    }
1843
1844    /// Force close the channel with given [`crate::ChannelId`].
1845    pub async fn force_close_channel(&self, channel_id: &ChannelId) -> Result<(), Error> {
1846        let channel = get_channel_in_state!(self, channel_id, Signed, None as Option<PublicKey>)?;
1847
1848        self.force_close_channel_internal(channel, true).await
1849    }
1850
1851    /// Offer to settle the balance of a channel so that the counter party gets
1852    /// `counter_payout`. Returns the [`dlc_messages::channel::SettleChannelOffer`]
1853    /// message to be sent and the public key of the counter party node.
1854    pub async fn settle_offer(
1855        &self,
1856        channel_id: &ChannelId,
1857        counter_payout: Amount,
1858    ) -> Result<(SettleOffer, PublicKey), Error> {
1859        let mut signed_channel =
1860            get_channel_in_state!(self, channel_id, Signed, None as Option<PublicKey>)?;
1861
1862        let msg = crate::channel_updater::settle_channel_offer(
1863            &self.secp,
1864            &mut signed_channel,
1865            counter_payout,
1866            PEER_TIMEOUT,
1867            &self.signer_provider,
1868            &self.time,
1869        )?;
1870
1871        let counter_party = signed_channel.counter_party;
1872
1873        self.store
1874            .upsert_channel(Channel::Signed(signed_channel), None)
1875            .await?;
1876
1877        Ok((msg, counter_party))
1878    }
1879
1880    /// Accept a settlement offer, returning the [`SettleAccept`] message to be
1881    /// sent to the node with the returned [`PublicKey`] id.
1882    pub async fn accept_settle_offer(
1883        &self,
1884        channel_id: &ChannelId,
1885    ) -> Result<(SettleAccept, PublicKey), Error> {
1886        let mut signed_channel =
1887            get_channel_in_state!(self, channel_id, Signed, None as Option<PublicKey>)?;
1888
1889        let msg = crate::channel_updater::settle_channel_accept(
1890            &self.secp,
1891            &mut signed_channel,
1892            CET_NSEQUENCE,
1893            0,
1894            PEER_TIMEOUT,
1895            &self.signer_provider,
1896            &self.time,
1897            &self.chain_monitor,
1898        )
1899        .await?;
1900
1901        let counter_party = signed_channel.counter_party;
1902
1903        self.store
1904            .upsert_channel(Channel::Signed(signed_channel), None)
1905            .await?;
1906
1907        Ok((msg, counter_party))
1908    }
1909
1910    /// Returns a [`RenewOffer`] message as well as the [`PublicKey`] of the
1911    /// counter party's node to offer the establishment of a new contract in the
1912    /// channel.
1913    pub async fn renew_offer(
1914        &self,
1915        channel_id: &ChannelId,
1916        counter_payout: Amount,
1917        contract_input: &ContractInput,
1918    ) -> Result<(RenewOffer, PublicKey), Error> {
1919        let mut signed_channel =
1920            get_channel_in_state!(self, channel_id, Signed, None as Option<PublicKey>)?;
1921
1922        let oracle_announcements = self.oracle_announcements(contract_input).await?;
1923
1924        let (msg, offered_contract) = crate::channel_updater::renew_offer(
1925            &self.secp,
1926            &mut signed_channel,
1927            contract_input,
1928            oracle_announcements,
1929            counter_payout,
1930            REFUND_DELAY,
1931            PEER_TIMEOUT,
1932            CET_NSEQUENCE,
1933            &self.signer_provider,
1934            &self.time,
1935        )?;
1936
1937        let counter_party = offered_contract.counter_party;
1938
1939        self.store
1940            .upsert_channel(
1941                Channel::Signed(signed_channel),
1942                Some(Contract::Offered(offered_contract)),
1943            )
1944            .await?;
1945
1946        Ok((msg, counter_party))
1947    }
1948
1949    /// Accept an offer to renew the contract in the channel. Returns the
1950    /// [`RenewAccept`] message to be sent to the peer with the returned
1951    /// [`PublicKey`] as node id.
1952    pub async fn accept_renew_offer(
1953        &self,
1954        channel_id: &ChannelId,
1955    ) -> Result<(RenewAccept, PublicKey), Error> {
1956        let mut signed_channel =
1957            get_channel_in_state!(self, channel_id, Signed, None as Option<PublicKey>)?;
1958        let offered_contract_id = signed_channel.get_contract_id().ok_or_else(|| {
1959            Error::InvalidState("Expected to have a contract id but did not.".to_string())
1960        })?;
1961
1962        let offered_contract = get_contract_in_state!(
1963            self,
1964            &offered_contract_id,
1965            Offered,
1966            None as Option<PublicKey>
1967        )?;
1968
1969        let (accepted_contract, msg) = crate::channel_updater::accept_channel_renewal(
1970            &self.secp,
1971            &mut signed_channel,
1972            &offered_contract,
1973            CET_NSEQUENCE,
1974            PEER_TIMEOUT,
1975            &self.signer_provider,
1976            &self.time,
1977        )?;
1978
1979        let counter_party = signed_channel.counter_party;
1980
1981        self.store
1982            .upsert_channel(
1983                Channel::Signed(signed_channel),
1984                Some(Contract::Accepted(accepted_contract)),
1985            )
1986            .await?;
1987
1988        Ok((msg, counter_party))
1989    }
1990
1991    /// Reject an offer to renew the contract in the channel. Returns the
1992    /// [`Reject`] message to be sent to the peer with the returned
1993    /// [`PublicKey`] node id.
1994    pub async fn reject_renew_offer(
1995        &self,
1996        channel_id: &ChannelId,
1997    ) -> Result<(Reject, PublicKey), Error> {
1998        let mut signed_channel =
1999            get_channel_in_state!(self, channel_id, Signed, None as Option<PublicKey>)?;
2000        let offered_contract_id = signed_channel.get_contract_id().ok_or_else(|| {
2001            Error::InvalidState(
2002                "Expected to be in a state with an associated contract id but was not.".to_string(),
2003            )
2004        })?;
2005
2006        let offered_contract = get_contract_in_state!(
2007            self,
2008            &offered_contract_id,
2009            Offered,
2010            None as Option<PublicKey>
2011        )?;
2012
2013        let reject_msg = crate::channel_updater::reject_renew_offer(&mut signed_channel)?;
2014
2015        let counter_party = signed_channel.counter_party;
2016
2017        self.store
2018            .upsert_channel(
2019                Channel::Signed(signed_channel),
2020                Some(Contract::Rejected(offered_contract)),
2021            )
2022            .await?;
2023
2024        Ok((reject_msg, counter_party))
2025    }
2026
2027    /// Returns a [`Reject`] message to be sent to the counter party of the
2028    /// channel to inform them that the local party does not wish to accept the
2029    /// proposed settle offer.
2030    pub async fn reject_settle_offer(
2031        &self,
2032        channel_id: &ChannelId,
2033    ) -> Result<(Reject, PublicKey), Error> {
2034        let mut signed_channel =
2035            get_channel_in_state!(self, channel_id, Signed, None as Option<PublicKey>)?;
2036
2037        let msg = crate::channel_updater::reject_settle_offer(&mut signed_channel)?;
2038
2039        let counter_party = signed_channel.counter_party;
2040
2041        self.store
2042            .upsert_channel(Channel::Signed(signed_channel), None)
2043            .await?;
2044
2045        Ok((msg, counter_party))
2046    }
2047
2048    /// Returns a [`CollaborativeCloseOffer`] message to be sent to the counter
2049    /// party of the channel and update the state of the channel. Note that the
2050    /// channel will be forced closed after a timeout if the counter party does
2051    /// not broadcast the close transaction.
2052    pub async fn offer_collaborative_close(
2053        &self,
2054        channel_id: &ChannelId,
2055        counter_payout: Amount,
2056        additional_inputs: Vec<OutPoint>,
2057    ) -> Result<CollaborativeCloseOffer, Error> {
2058        let mut signed_channel =
2059            get_channel_in_state!(self, channel_id, Signed, None as Option<PublicKey>)?;
2060
2061        let (msg, close_tx) = crate::channel_updater::offer_collaborative_close(
2062            &self.secp,
2063            &mut signed_channel,
2064            counter_payout,
2065            additional_inputs,
2066            &self.signer_provider,
2067            &self.time,
2068        )?;
2069
2070        self.chain_monitor.lock().await.add_tx(
2071            close_tx.compute_txid(),
2072            ChannelInfo {
2073                channel_id: *channel_id,
2074                tx_type: TxType::CollaborativeClose,
2075            },
2076        );
2077
2078        self.store
2079            .upsert_channel(Channel::Signed(signed_channel), None)
2080            .await?;
2081        self.store
2082            .persist_chain_monitor(&*self.chain_monitor.lock().await)
2083            .await?;
2084
2085        Ok(msg)
2086    }
2087
2088    /// Accept an offer to collaboratively close the channel. The close transaction
2089    /// will be broadcast and the state of the channel updated.
2090    pub async fn accept_collaborative_close(&self, channel_id: &ChannelId) -> Result<(), Error> {
2091        let signed_channel =
2092            get_channel_in_state!(self, channel_id, Signed, None as Option<PublicKey>)?;
2093
2094        let closed_contract = if let Some(SignedChannelState::Established {
2095            signed_contract_id,
2096            ..
2097        }) = &signed_channel.roll_back_state
2098        {
2099            let counter_payout = get_signed_channel_state!(
2100                signed_channel,
2101                CollaborativeCloseOffered,
2102                counter_payout
2103            )?;
2104            Some(
2105                self.get_collaboratively_closed_contract(signed_contract_id, *counter_payout, true)
2106                    .await?,
2107            )
2108        } else {
2109            None
2110        };
2111
2112        let (close_tx, closed_channel) = crate::channel_updater::accept_collaborative_close_offer(
2113            &self.secp,
2114            &signed_channel,
2115            &self.signer_provider,
2116        )?;
2117
2118        self.blockchain.send_transaction(&close_tx).await?;
2119
2120        self.store.upsert_channel(closed_channel, None).await?;
2121
2122        if let Some(closed_contract) = closed_contract {
2123            self.store
2124                .update_contract(&Contract::Closed(closed_contract))
2125                .await?;
2126        }
2127
2128        Ok(())
2129    }
2130
2131    async fn try_finalize_closing_established_channel(
2132        &self,
2133        signed_channel: SignedChannel,
2134    ) -> Result<(), Error> {
2135        let (buffer_tx, contract_id, &is_initiator) = get_signed_channel_state!(
2136            signed_channel,
2137            Closing,
2138            buffer_transaction,
2139            contract_id,
2140            is_initiator
2141        )?;
2142
2143        if self
2144            .blockchain
2145            .get_transaction_confirmations(&buffer_tx.compute_txid())
2146            .await?
2147            >= CET_NSEQUENCE
2148        {
2149            log_info!(
2150                self.logger,
2151                "Buffer transaction for contract {} has enough confirmations to spend from it",
2152                serialize_hex(&contract_id)
2153            );
2154
2155            let confirmed_contract =
2156                get_contract_in_state!(self, &contract_id, Confirmed, None as Option<PublicKey>)?;
2157
2158            let (contract_info, adaptor_info, attestations) = self
2159                .get_closable_contract_info(&confirmed_contract)
2160                .await
2161                .ok_or_else(|| {
2162                    Error::InvalidState("Could not get information to close contract".to_string())
2163                })?;
2164
2165            let (signed_cet, closed_channel) =
2166                crate::channel_updater::finalize_unilateral_close_settled_channel(
2167                    &self.secp,
2168                    &signed_channel,
2169                    &confirmed_contract,
2170                    contract_info,
2171                    &attestations,
2172                    adaptor_info,
2173                    &self.signer_provider,
2174                    is_initiator,
2175                )?;
2176
2177            let closed_contract = self
2178                .close_contract(
2179                    &confirmed_contract,
2180                    signed_cet,
2181                    attestations.iter().map(|x| &x.1).cloned().collect(),
2182                )
2183                .await?;
2184
2185            self.chain_monitor
2186                .lock()
2187                .await
2188                .cleanup_channel(signed_channel.channel_id);
2189
2190            self.store
2191                .upsert_channel(closed_channel, Some(closed_contract))
2192                .await?;
2193        }
2194
2195        Ok(())
2196    }
2197
2198    async fn on_offer_channel(
2199        &self,
2200        offer_channel: &OfferChannel,
2201        counter_party: PublicKey,
2202    ) -> Result<(), Error> {
2203        offer_channel.validate(
2204            &self.secp,
2205            REFUND_DELAY,
2206            REFUND_DELAY * 2,
2207            CET_NSEQUENCE,
2208            CET_NSEQUENCE * 2,
2209        )?;
2210
2211        let keys_id = self
2212            .signer_provider
2213            .derive_signer_key_id(false, offer_channel.temporary_contract_id);
2214        let (channel, contract) =
2215            OfferedChannel::from_offer_channel(offer_channel, counter_party, keys_id)?;
2216
2217        contract.validate()?;
2218
2219        if self
2220            .store
2221            .get_channel(&channel.temporary_channel_id)
2222            .await?
2223            .is_some()
2224        {
2225            return Err(Error::InvalidParameters(
2226                "Channel with identical idea already in store".to_string(),
2227            ));
2228        }
2229
2230        self.store
2231            .upsert_channel(Channel::Offered(channel), Some(Contract::Offered(contract)))
2232            .await?;
2233
2234        Ok(())
2235    }
2236
2237    async fn on_accept_channel(
2238        &self,
2239        accept_channel: &AcceptChannel,
2240        peer_id: &PublicKey,
2241    ) -> Result<SignChannel, Error> {
2242        let offered_channel = get_channel_in_state!(
2243            self,
2244            &accept_channel.temporary_channel_id,
2245            Offered,
2246            Some(*peer_id)
2247        )?;
2248        let offered_contract = get_contract_in_state!(
2249            self,
2250            &offered_channel.offered_contract_id,
2251            Offered,
2252            Some(*peer_id)
2253        )?;
2254
2255        let (signed_channel, signed_contract, sign_channel) = {
2256            let res = crate::channel_updater::verify_and_sign_accepted_channel(
2257                &self.secp,
2258                &offered_channel,
2259                &offered_contract,
2260                accept_channel,
2261                //TODO(tibo): this should be parameterizable.
2262                CET_NSEQUENCE,
2263                &self.wallet,
2264                &self.signer_provider,
2265                &self.store,
2266                &self.chain_monitor,
2267                &self.logger,
2268            )
2269            .await;
2270
2271            match res {
2272                Ok(res) => res,
2273                Err(e) => {
2274                    let channel = crate::channel::FailedAccept {
2275                        temporary_channel_id: accept_channel.temporary_channel_id,
2276                        error_message: format!("Error validating accept channel: {e}"),
2277                        accept_message: accept_channel.clone(),
2278                        counter_party: *peer_id,
2279                    };
2280                    self.store
2281                        .upsert_channel(Channel::FailedAccept(channel), None)
2282                        .await?;
2283                    return Err(e);
2284                }
2285            }
2286        };
2287
2288        self.wallet.import_address(&Address::p2wsh(
2289            &signed_contract
2290                .accepted_contract
2291                .dlc_transactions
2292                .funding_script_pubkey,
2293            self.blockchain.get_network()?,
2294        ))?;
2295
2296        if let SignedChannelState::Established {
2297            buffer_transaction, ..
2298        } = &signed_channel.state
2299        {
2300            self.chain_monitor.lock().await.add_tx(
2301                buffer_transaction.compute_txid(),
2302                ChannelInfo {
2303                    channel_id: signed_channel.channel_id,
2304                    tx_type: TxType::BufferTx,
2305                },
2306            );
2307        } else {
2308            unreachable!();
2309        }
2310
2311        self.store
2312            .upsert_channel(
2313                Channel::Signed(signed_channel),
2314                Some(Contract::Signed(signed_contract)),
2315            )
2316            .await?;
2317
2318        self.store
2319            .persist_chain_monitor(&*self.chain_monitor.lock().await)
2320            .await?;
2321
2322        Ok(sign_channel)
2323    }
2324
2325    async fn on_sign_channel(
2326        &self,
2327        sign_channel: &SignChannel,
2328        peer_id: &PublicKey,
2329    ) -> Result<(), Error> {
2330        let accepted_channel =
2331            get_channel_in_state!(self, &sign_channel.channel_id, Accepted, Some(*peer_id))?;
2332        let accepted_contract = get_contract_in_state!(
2333            self,
2334            &accepted_channel.accepted_contract_id,
2335            Accepted,
2336            Some(*peer_id)
2337        )?;
2338
2339        let (signed_channel, signed_contract, signed_fund_tx) = {
2340            let res = verify_signed_channel(
2341                &self.secp,
2342                &accepted_channel,
2343                &accepted_contract,
2344                sign_channel,
2345                &self.wallet,
2346                &self.chain_monitor,
2347                &self.store,
2348                &self.signer_provider,
2349                &self.logger,
2350            )
2351            .await;
2352
2353            match res {
2354                Ok(res) => res,
2355                Err(e) => {
2356                    let channel = crate::channel::FailedSign {
2357                        channel_id: sign_channel.channel_id,
2358                        error_message: format!("Error validating accept channel: {e}"),
2359                        sign_message: sign_channel.clone(),
2360                        counter_party: *peer_id,
2361                    };
2362                    self.store
2363                        .upsert_channel(Channel::FailedSign(channel), None)
2364                        .await?;
2365                    return Err(e);
2366                }
2367            }
2368        };
2369
2370        if let SignedChannelState::Established {
2371            buffer_transaction, ..
2372        } = &signed_channel.state
2373        {
2374            self.chain_monitor.lock().await.add_tx(
2375                buffer_transaction.compute_txid(),
2376                ChannelInfo {
2377                    channel_id: signed_channel.channel_id,
2378                    tx_type: TxType::BufferTx,
2379                },
2380            );
2381        } else {
2382            unreachable!();
2383        }
2384
2385        self.blockchain.send_transaction(&signed_fund_tx).await?;
2386
2387        self.store
2388            .upsert_channel(
2389                Channel::Signed(signed_channel),
2390                Some(Contract::Signed(signed_contract)),
2391            )
2392            .await?;
2393        self.store
2394            .persist_chain_monitor(&*self.chain_monitor.lock().await)
2395            .await?;
2396
2397        Ok(())
2398    }
2399
2400    async fn on_settle_offer(
2401        &self,
2402        settle_offer: &SettleOffer,
2403        peer_id: &PublicKey,
2404    ) -> Result<Option<Reject>, Error> {
2405        let mut signed_channel =
2406            get_channel_in_state!(self, &settle_offer.channel_id, Signed, Some(*peer_id))?;
2407
2408        if let SignedChannelState::SettledOffered { .. } = signed_channel.state {
2409            return Ok(Some(Reject {
2410                channel_id: settle_offer.channel_id,
2411            }));
2412        }
2413
2414        crate::channel_updater::on_settle_offer(&mut signed_channel, settle_offer)?;
2415
2416        self.store
2417            .upsert_channel(Channel::Signed(signed_channel), None)
2418            .await?;
2419
2420        Ok(None)
2421    }
2422
2423    async fn on_settle_accept(
2424        &self,
2425        settle_accept: &SettleAccept,
2426        peer_id: &PublicKey,
2427    ) -> Result<SettleConfirm, Error> {
2428        let mut signed_channel =
2429            get_channel_in_state!(self, &settle_accept.channel_id, Signed, Some(*peer_id))?;
2430
2431        let msg = crate::channel_updater::settle_channel_confirm(
2432            &self.secp,
2433            &mut signed_channel,
2434            settle_accept,
2435            CET_NSEQUENCE,
2436            0,
2437            PEER_TIMEOUT,
2438            &self.signer_provider,
2439            &self.time,
2440            &self.chain_monitor,
2441        )
2442        .await?;
2443
2444        self.store
2445            .upsert_channel(Channel::Signed(signed_channel), None)
2446            .await?;
2447
2448        Ok(msg)
2449    }
2450
2451    async fn on_settle_confirm(
2452        &self,
2453        settle_confirm: &SettleConfirm,
2454        peer_id: &PublicKey,
2455    ) -> Result<SettleFinalize, Error> {
2456        let mut signed_channel =
2457            get_channel_in_state!(self, &settle_confirm.channel_id, Signed, Some(*peer_id))?;
2458        let &own_payout = get_signed_channel_state!(signed_channel, SettledAccepted, own_payout)?;
2459        let (prev_buffer_tx, own_buffer_adaptor_signature, is_offer, signed_contract_id) = get_signed_channel_rollback_state!(
2460            signed_channel,
2461            Established,
2462            buffer_transaction,
2463            own_buffer_adaptor_signature,
2464            is_offer,
2465            signed_contract_id
2466        )?;
2467
2468        let prev_buffer_txid = prev_buffer_tx.compute_txid();
2469        let own_buffer_adaptor_signature = *own_buffer_adaptor_signature;
2470        let is_offer = *is_offer;
2471        let signed_contract_id = *signed_contract_id;
2472
2473        let msg = crate::channel_updater::settle_channel_finalize(
2474            &self.secp,
2475            &mut signed_channel,
2476            settle_confirm,
2477            &self.signer_provider,
2478        )?;
2479
2480        self.chain_monitor.lock().await.add_tx(
2481            prev_buffer_txid,
2482            ChannelInfo {
2483                channel_id: signed_channel.channel_id,
2484                tx_type: TxType::Revoked {
2485                    update_idx: signed_channel.update_idx + 1,
2486                    own_adaptor_signature: own_buffer_adaptor_signature,
2487                    is_offer,
2488                    revoked_tx_type: RevokedTxType::Buffer,
2489                },
2490            },
2491        );
2492
2493        let closed_contract = Contract::Closed(
2494            self.get_collaboratively_closed_contract(&signed_contract_id, own_payout, true)
2495                .await?,
2496        );
2497
2498        self.store
2499            .upsert_channel(Channel::Signed(signed_channel), Some(closed_contract))
2500            .await?;
2501        self.store
2502            .persist_chain_monitor(&*self.chain_monitor.lock().await)
2503            .await?;
2504
2505        Ok(msg)
2506    }
2507
2508    async fn on_settle_finalize(
2509        &self,
2510        settle_finalize: &SettleFinalize,
2511        peer_id: &PublicKey,
2512    ) -> Result<(), Error> {
2513        let mut signed_channel =
2514            get_channel_in_state!(self, &settle_finalize.channel_id, Signed, Some(*peer_id))?;
2515        let &own_payout = get_signed_channel_state!(signed_channel, SettledConfirmed, own_payout)?;
2516        let (buffer_tx, own_buffer_adaptor_signature, is_offer, signed_contract_id) = get_signed_channel_rollback_state!(
2517            signed_channel,
2518            Established,
2519            buffer_transaction,
2520            own_buffer_adaptor_signature,
2521            is_offer,
2522            signed_contract_id
2523        )?;
2524
2525        let own_buffer_adaptor_signature = *own_buffer_adaptor_signature;
2526        let is_offer = *is_offer;
2527        let buffer_txid = buffer_tx.compute_txid();
2528        let signed_contract_id = *signed_contract_id;
2529
2530        crate::channel_updater::settle_channel_on_finalize(
2531            &self.secp,
2532            &mut signed_channel,
2533            settle_finalize,
2534        )?;
2535
2536        self.chain_monitor.lock().await.add_tx(
2537            buffer_txid,
2538            ChannelInfo {
2539                channel_id: signed_channel.channel_id,
2540                tx_type: TxType::Revoked {
2541                    update_idx: signed_channel.update_idx + 1,
2542                    own_adaptor_signature: own_buffer_adaptor_signature,
2543                    is_offer,
2544                    revoked_tx_type: RevokedTxType::Buffer,
2545                },
2546            },
2547        );
2548
2549        let closed_contract = Contract::Closed(
2550            self.get_collaboratively_closed_contract(&signed_contract_id, own_payout, true)
2551                .await?,
2552        );
2553        self.store
2554            .upsert_channel(Channel::Signed(signed_channel), Some(closed_contract))
2555            .await?;
2556        self.store
2557            .persist_chain_monitor(&*self.chain_monitor.lock().await)
2558            .await?;
2559
2560        Ok(())
2561    }
2562
2563    async fn on_renew_offer(
2564        &self,
2565        renew_offer: &RenewOffer,
2566        peer_id: &PublicKey,
2567    ) -> Result<Option<Reject>, Error> {
2568        let mut signed_channel =
2569            get_channel_in_state!(self, &renew_offer.channel_id, Signed, Some(*peer_id))?;
2570
2571        // Received a renew offer when we already sent one, we reject it.
2572        if let SignedChannelState::RenewOffered { is_offer, .. } = signed_channel.state {
2573            if is_offer {
2574                return Ok(Some(Reject {
2575                    channel_id: renew_offer.channel_id,
2576                }));
2577            }
2578        }
2579
2580        let offered_contract = crate::channel_updater::on_renew_offer(
2581            &mut signed_channel,
2582            renew_offer,
2583            PEER_TIMEOUT,
2584            &self.time,
2585        )?;
2586
2587        self.store.create_contract(&offered_contract).await?;
2588        self.store
2589            .upsert_channel(Channel::Signed(signed_channel), None)
2590            .await?;
2591
2592        Ok(None)
2593    }
2594
2595    async fn on_renew_accept(
2596        &self,
2597        renew_accept: &RenewAccept,
2598        peer_id: &PublicKey,
2599    ) -> Result<RenewConfirm, Error> {
2600        let mut signed_channel =
2601            get_channel_in_state!(self, &renew_accept.channel_id, Signed, Some(*peer_id))?;
2602        let offered_contract_id = signed_channel.get_contract_id().ok_or_else(|| {
2603            Error::InvalidState(
2604                "Expected to be in a state with an associated contract id but was not.".to_string(),
2605            )
2606        })?;
2607
2608        let offered_contract =
2609            get_contract_in_state!(self, &offered_contract_id, Offered, Some(*peer_id))?;
2610
2611        let (signed_contract, msg) = crate::channel_updater::verify_renew_accept_and_confirm(
2612            &self.secp,
2613            renew_accept,
2614            &mut signed_channel,
2615            &offered_contract,
2616            CET_NSEQUENCE,
2617            PEER_TIMEOUT,
2618            &self.wallet,
2619            &self.signer_provider,
2620            &self.time,
2621            &self.store,
2622            &self.logger,
2623        )
2624        .await?;
2625
2626        // Directly confirmed as we're in a channel the fund tx is already confirmed.
2627        self.store
2628            .upsert_channel(
2629                Channel::Signed(signed_channel),
2630                Some(Contract::Confirmed(signed_contract)),
2631            )
2632            .await?;
2633
2634        Ok(msg)
2635    }
2636
2637    async fn on_renew_confirm(
2638        &self,
2639        renew_confirm: &RenewConfirm,
2640        peer_id: &PublicKey,
2641    ) -> Result<RenewFinalize, Error> {
2642        let mut signed_channel =
2643            get_channel_in_state!(self, &renew_confirm.channel_id, Signed, Some(*peer_id))?;
2644        let own_payout = get_signed_channel_state!(signed_channel, RenewAccepted, own_payout)?;
2645        let contract_id = signed_channel.get_contract_id().ok_or_else(|| {
2646            Error::InvalidState(
2647                "Expected to be in a state with an associated contract id but was not.".to_string(),
2648            )
2649        })?;
2650
2651        let (tx_type, prev_tx_id, closed_contract) = match signed_channel
2652            .roll_back_state
2653            .as_ref()
2654            .expect("to have a rollback state")
2655        {
2656            SignedChannelState::Established {
2657                own_buffer_adaptor_signature,
2658                buffer_transaction,
2659                signed_contract_id,
2660                ..
2661            } => {
2662                let closed_contract = Contract::Closed(
2663                    self.get_collaboratively_closed_contract(signed_contract_id, *own_payout, true)
2664                        .await?,
2665                );
2666                (
2667                    TxType::Revoked {
2668                        update_idx: signed_channel.update_idx,
2669                        own_adaptor_signature: *own_buffer_adaptor_signature,
2670                        is_offer: false,
2671                        revoked_tx_type: RevokedTxType::Buffer,
2672                    },
2673                    buffer_transaction.compute_txid(),
2674                    Some(closed_contract),
2675                )
2676            }
2677            SignedChannelState::Settled {
2678                settle_tx,
2679                own_settle_adaptor_signature,
2680                ..
2681            } => (
2682                TxType::Revoked {
2683                    update_idx: signed_channel.update_idx,
2684                    own_adaptor_signature: *own_settle_adaptor_signature,
2685                    is_offer: false,
2686                    revoked_tx_type: RevokedTxType::Settle,
2687                },
2688                settle_tx.compute_txid(),
2689                None,
2690            ),
2691            s => {
2692                return Err(Error::InvalidState(format!(
2693                    "Expected rollback state Established or Revoked but found {s:?}"
2694                )))
2695            }
2696        };
2697        let accepted_contract =
2698            get_contract_in_state!(self, &contract_id, Accepted, Some(*peer_id))?;
2699
2700        let (signed_contract, msg) = crate::channel_updater::verify_renew_confirm_and_finalize(
2701            &self.secp,
2702            &mut signed_channel,
2703            &accepted_contract,
2704            renew_confirm,
2705            PEER_TIMEOUT,
2706            &self.time,
2707            &self.wallet,
2708            &self.signer_provider,
2709            &self.chain_monitor,
2710            &self.store,
2711            &self.logger,
2712        )
2713        .await?;
2714
2715        self.chain_monitor.lock().await.add_tx(
2716            prev_tx_id,
2717            ChannelInfo {
2718                channel_id: signed_channel.channel_id,
2719                tx_type,
2720            },
2721        );
2722
2723        // Directly confirmed as we're in a channel the fund tx is already confirmed.
2724        self.store
2725            .upsert_channel(
2726                Channel::Signed(signed_channel),
2727                Some(Contract::Confirmed(signed_contract)),
2728            )
2729            .await?;
2730
2731        self.store
2732            .persist_chain_monitor(&*self.chain_monitor.lock().await)
2733            .await?;
2734
2735        if let Some(closed_contract) = closed_contract {
2736            self.store.update_contract(&closed_contract).await?;
2737        }
2738
2739        Ok(msg)
2740    }
2741
2742    async fn on_renew_finalize(
2743        &self,
2744        renew_finalize: &RenewFinalize,
2745        peer_id: &PublicKey,
2746    ) -> Result<RenewRevoke, Error> {
2747        let mut signed_channel =
2748            get_channel_in_state!(self, &renew_finalize.channel_id, Signed, Some(*peer_id))?;
2749        let own_payout = get_signed_channel_state!(signed_channel, RenewConfirmed, own_payout)?;
2750
2751        let (tx_type, prev_tx_id, closed_contract) = match signed_channel
2752            .roll_back_state
2753            .as_ref()
2754            .expect("to have a rollback state")
2755        {
2756            SignedChannelState::Established {
2757                own_buffer_adaptor_signature,
2758                buffer_transaction,
2759                signed_contract_id,
2760                ..
2761            } => {
2762                let closed_contract = self
2763                    .get_collaboratively_closed_contract(signed_contract_id, *own_payout, true)
2764                    .await?;
2765                (
2766                    TxType::Revoked {
2767                        update_idx: signed_channel.update_idx,
2768                        own_adaptor_signature: *own_buffer_adaptor_signature,
2769                        is_offer: false,
2770                        revoked_tx_type: RevokedTxType::Buffer,
2771                    },
2772                    buffer_transaction.compute_txid(),
2773                    Some(Contract::Closed(closed_contract)),
2774                )
2775            }
2776            SignedChannelState::Settled {
2777                settle_tx,
2778                own_settle_adaptor_signature,
2779                ..
2780            } => (
2781                TxType::Revoked {
2782                    update_idx: signed_channel.update_idx,
2783                    own_adaptor_signature: *own_settle_adaptor_signature,
2784                    is_offer: false,
2785                    revoked_tx_type: RevokedTxType::Settle,
2786                },
2787                settle_tx.compute_txid(),
2788                None,
2789            ),
2790            s => {
2791                return Err(Error::InvalidState(format!(
2792                    "Expected rollback state of Established or Settled but was {s:?}"
2793                )))
2794            }
2795        };
2796
2797        let msg = crate::channel_updater::renew_channel_on_finalize(
2798            &self.secp,
2799            &mut signed_channel,
2800            renew_finalize,
2801            &self.signer_provider,
2802        )?;
2803
2804        self.chain_monitor.lock().await.add_tx(
2805            prev_tx_id,
2806            ChannelInfo {
2807                channel_id: signed_channel.channel_id,
2808                tx_type,
2809            },
2810        );
2811
2812        let buffer_tx =
2813            get_signed_channel_state!(signed_channel, Established, ref buffer_transaction)?;
2814
2815        self.chain_monitor.lock().await.add_tx(
2816            buffer_tx.compute_txid(),
2817            ChannelInfo {
2818                channel_id: signed_channel.channel_id,
2819                tx_type: TxType::BufferTx,
2820            },
2821        );
2822
2823        self.store
2824            .upsert_channel(Channel::Signed(signed_channel), None)
2825            .await?;
2826        self.store
2827            .persist_chain_monitor(&*self.chain_monitor.lock().await)
2828            .await?;
2829
2830        if let Some(closed_contract) = closed_contract {
2831            self.store.update_contract(&closed_contract).await?;
2832        }
2833
2834        Ok(msg)
2835    }
2836
2837    async fn on_renew_revoke(
2838        &self,
2839        renew_revoke: &RenewRevoke,
2840        peer_id: &PublicKey,
2841    ) -> Result<(), Error> {
2842        let mut signed_channel =
2843            get_channel_in_state!(self, &renew_revoke.channel_id, Signed, Some(*peer_id))?;
2844
2845        crate::channel_updater::renew_channel_on_revoke(
2846            &self.secp,
2847            &mut signed_channel,
2848            renew_revoke,
2849        )?;
2850
2851        self.store
2852            .upsert_channel(Channel::Signed(signed_channel), None)
2853            .await?;
2854
2855        Ok(())
2856    }
2857
2858    async fn on_collaborative_close_offer(
2859        &self,
2860        close_offer: &CollaborativeCloseOffer,
2861        peer_id: &PublicKey,
2862    ) -> Result<(), Error> {
2863        let mut signed_channel =
2864            get_channel_in_state!(self, &close_offer.channel_id, Signed, Some(*peer_id))?;
2865
2866        crate::channel_updater::on_collaborative_close_offer(
2867            &mut signed_channel,
2868            close_offer,
2869            PEER_TIMEOUT,
2870            &self.time,
2871        )?;
2872
2873        self.store
2874            .upsert_channel(Channel::Signed(signed_channel), None)
2875            .await?;
2876
2877        Ok(())
2878    }
2879
2880    async fn on_reject(&self, reject: &Reject, counter_party: &PublicKey) -> Result<(), Error> {
2881        let channel = self.store.get_channel(&reject.channel_id).await?;
2882
2883        if let Some(channel) = channel {
2884            if channel.get_counter_party_id() != *counter_party {
2885                return Err(Error::InvalidParameters(format!(
2886                    "Peer {:02x?} is not involved with {} {:02x?}.",
2887                    counter_party,
2888                    stringify!(Channel),
2889                    channel.get_id()
2890                )));
2891            }
2892            match channel {
2893                Channel::Offered(offered_channel) => {
2894                    let offered_contract = get_contract_in_state!(
2895                        self,
2896                        &offered_channel.offered_contract_id,
2897                        Offered,
2898                        None as Option<PublicKey>
2899                    )?;
2900                    let utxos = offered_contract
2901                        .funding_inputs
2902                        .iter()
2903                        .map(|funding_input| {
2904                            let txid = Transaction::consensus_decode(
2905                                &mut funding_input.prev_tx.as_slice(),
2906                            )
2907                            .expect("Transaction Decode Error")
2908                            .compute_txid();
2909                            let vout = funding_input.prev_tx_vout;
2910                            OutPoint { txid, vout }
2911                        })
2912                        .collect::<Vec<_>>();
2913
2914                    self.wallet.unreserve_utxos(&utxos)?;
2915
2916                    // remove rejected channel, since nothing has been confirmed on chain yet.
2917                    self.store
2918                        .upsert_channel(
2919                            Channel::Cancelled(offered_channel),
2920                            Some(Contract::Rejected(offered_contract)),
2921                        )
2922                        .await?;
2923                }
2924                Channel::Signed(mut signed_channel) => {
2925                    let contract = match signed_channel.state {
2926                        SignedChannelState::RenewOffered {
2927                            offered_contract_id,
2928                            ..
2929                        } => {
2930                            let offered_contract = get_contract_in_state!(
2931                                self,
2932                                &offered_contract_id,
2933                                Offered,
2934                                None::<PublicKey>
2935                            )?;
2936                            Some(Contract::Rejected(offered_contract))
2937                        }
2938                        _ => None,
2939                    };
2940
2941                    crate::channel_updater::on_reject(&mut signed_channel)?;
2942
2943                    self.store
2944                        .upsert_channel(Channel::Signed(signed_channel), contract)
2945                        .await?;
2946                }
2947                channel => {
2948                    return Err(Error::InvalidState(format!(
2949                        "Not in a state adequate to receive a reject message. {channel:?}"
2950                    )))
2951                }
2952            }
2953        } else {
2954            log_warn!(
2955                self.logger,
2956                "Couldn't find rejected dlc channel with id: {}",
2957                reject.channel_id.to_lower_hex_string()
2958            );
2959        }
2960
2961        Ok(())
2962    }
2963
2964    async fn channel_checks(&self) -> Result<(), Error> {
2965        let established_closing_channels = self
2966            .store
2967            .get_signed_channels(Some(SignedChannelStateType::Closing))
2968            .await?;
2969
2970        for channel in established_closing_channels {
2971            if let Err(e) = self.try_finalize_closing_established_channel(channel).await {
2972                log_error!(
2973                    self.logger,
2974                    "Error trying to close established channel: {}",
2975                    e
2976                );
2977            }
2978        }
2979
2980        if let Err(e) = self.check_for_timed_out_channels().await {
2981            log_error!(self.logger, "Error checking timed out channels {}", e);
2982        }
2983        self.check_for_watched_tx().await
2984    }
2985
2986    async fn check_for_timed_out_channels(&self) -> Result<(), Error> {
2987        check_for_timed_out_channels!(self, RenewOffered);
2988        check_for_timed_out_channels!(self, RenewAccepted);
2989        check_for_timed_out_channels!(self, RenewConfirmed);
2990        check_for_timed_out_channels!(self, SettledOffered);
2991        check_for_timed_out_channels!(self, SettledAccepted);
2992        check_for_timed_out_channels!(self, SettledConfirmed);
2993
2994        Ok(())
2995    }
2996
2997    pub(crate) async fn process_watched_txs(
2998        &self,
2999        watched_txs: Vec<(Transaction, ChannelInfo)>,
3000    ) -> Result<(), Error> {
3001        for (tx, channel_info) in watched_txs {
3002            let mut signed_channel = match get_channel_in_state!(
3003                self,
3004                &channel_info.channel_id,
3005                Signed,
3006                None as Option<PublicKey>
3007            ) {
3008                Ok(c) => c,
3009                Err(e) => {
3010                    log_error!(
3011                        self.logger,
3012                        "Could not retrieve channel {:?}: {}",
3013                        channel_info.channel_id,
3014                        e
3015                    );
3016                    continue;
3017                }
3018            };
3019
3020            let persist = match channel_info.tx_type {
3021                TxType::BufferTx => {
3022                    // TODO(tibo): should only considered closed after some confirmations.
3023                    // Ideally should save previous state, and maybe restore in
3024                    // case of reorg, though if the counter party has sent the
3025                    // tx to close the channel it is unlikely that the tx will
3026                    // not be part of a future block.
3027
3028                    let contract_id = signed_channel
3029                        .get_contract_id()
3030                        .expect("to have a contract id");
3031                    let mut state = SignedChannelState::Closing {
3032                        buffer_transaction: tx.clone(),
3033                        is_initiator: false,
3034                        contract_id,
3035                        keys_id: signed_channel.keys_id().ok_or_else(|| {
3036                            Error::InvalidState("Expected to have keys_id.".to_string())
3037                        })?,
3038                    };
3039                    std::mem::swap(&mut signed_channel.state, &mut state);
3040
3041                    signed_channel.roll_back_state = Some(state);
3042
3043                    self.store
3044                        .upsert_channel(Channel::Signed(signed_channel), None)
3045                        .await?;
3046
3047                    false
3048                }
3049                TxType::Revoked {
3050                    update_idx,
3051                    own_adaptor_signature,
3052                    is_offer,
3053                    revoked_tx_type,
3054                } => {
3055                    let secret = signed_channel
3056                        .counter_party_commitment_secrets
3057                        .get_secret(update_idx)
3058                        .expect("to be able to retrieve the per update secret");
3059                    let counter_per_update_secret = SecretKey::from_slice(&secret)
3060                        .expect("to be able to parse the counter per update secret.");
3061
3062                    let per_update_seed_pk = signed_channel.own_per_update_seed;
3063
3064                    let per_update_seed_sk = self
3065                        .signer_provider
3066                        .get_secret_key_for_pubkey(&per_update_seed_pk)?;
3067
3068                    let per_update_secret = SecretKey::from_slice(&build_commitment_secret(
3069                        per_update_seed_sk.as_ref(),
3070                        update_idx,
3071                    ))
3072                    .expect("a valid secret key.");
3073
3074                    let per_update_point =
3075                        PublicKey::from_secret_key(&self.secp, &per_update_secret);
3076
3077                    let own_revocation_params = signed_channel.own_points.get_revokable_params(
3078                        &self.secp,
3079                        &signed_channel.counter_points.revocation_basepoint,
3080                        &per_update_point,
3081                    );
3082
3083                    let counter_per_update_point =
3084                        PublicKey::from_secret_key(&self.secp, &counter_per_update_secret);
3085
3086                    let base_own_sk = self
3087                        .signer_provider
3088                        .get_secret_key_for_pubkey(&signed_channel.own_points.own_basepoint)?;
3089
3090                    let own_sk = derive_private_key(&self.secp, &per_update_point, &base_own_sk);
3091
3092                    let counter_revocation_params =
3093                        signed_channel.counter_points.get_revokable_params(
3094                            &self.secp,
3095                            &signed_channel.own_points.revocation_basepoint,
3096                            &counter_per_update_point,
3097                        );
3098
3099                    let witness = if signed_channel.own_params.fund_pubkey
3100                        < signed_channel.counter_params.fund_pubkey
3101                    {
3102                        tx.input[0].witness.to_vec().remove(1)
3103                    } else {
3104                        tx.input[0].witness.to_vec().remove(2)
3105                    };
3106
3107                    let sig_data = witness
3108                        .iter()
3109                        .take(witness.len() - 1)
3110                        .cloned()
3111                        .collect::<Vec<_>>();
3112                    let own_sig = Signature::from_der(&sig_data)?;
3113
3114                    let counter_sk = own_adaptor_signature.recover(
3115                        &self.secp,
3116                        &own_sig,
3117                        &counter_revocation_params.publish_pk.inner,
3118                    )?;
3119
3120                    let own_revocation_base_secret =
3121                        &self.signer_provider.get_secret_key_for_pubkey(
3122                            &signed_channel.own_points.revocation_basepoint,
3123                        )?;
3124
3125                    let counter_revocation_sk = derive_private_revocation_key(
3126                        &self.secp,
3127                        &counter_per_update_secret,
3128                        own_revocation_base_secret,
3129                    );
3130
3131                    let (offer_params, accept_params) = if is_offer {
3132                        (&own_revocation_params, &counter_revocation_params)
3133                    } else {
3134                        (&counter_revocation_params, &own_revocation_params)
3135                    };
3136
3137                    let fee_rate_per_vb: u64 = (self.fee_estimator.get_est_sat_per_1000_weight(
3138                        lightning::chain::chaininterface::ConfirmationTarget::UrgentOnChainSweep,
3139                    ) / 250)
3140                        .into();
3141
3142                    let signed_tx = match revoked_tx_type {
3143                        RevokedTxType::Buffer => {
3144                            ddk_dlc::channel::create_and_sign_punish_buffer_transaction(
3145                                &self.secp,
3146                                offer_params,
3147                                accept_params,
3148                                &own_sk,
3149                                &counter_sk,
3150                                &counter_revocation_sk,
3151                                &tx,
3152                                &self.wallet.get_new_address().await?,
3153                                0,
3154                                fee_rate_per_vb,
3155                            )?
3156                        }
3157                        RevokedTxType::Settle => {
3158                            ddk_dlc::channel::create_and_sign_punish_settle_transaction(
3159                                &self.secp,
3160                                offer_params,
3161                                accept_params,
3162                                &own_sk,
3163                                &counter_sk,
3164                                &counter_revocation_sk,
3165                                &tx,
3166                                &self.wallet.get_new_address().await?,
3167                                CET_NSEQUENCE,
3168                                0,
3169                                fee_rate_per_vb,
3170                                is_offer,
3171                            )?
3172                        }
3173                    };
3174
3175                    self.blockchain.send_transaction(&signed_tx).await?;
3176
3177                    let closed_channel = Channel::ClosedPunished(ClosedPunishedChannel {
3178                        counter_party: signed_channel.counter_party,
3179                        temporary_channel_id: signed_channel.temporary_channel_id,
3180                        channel_id: signed_channel.channel_id,
3181                        punish_txid: signed_tx.compute_txid(),
3182                    });
3183
3184                    //TODO(tibo): should probably make sure the tx is confirmed somewhere before
3185                    //stop watching the cheating tx.
3186                    self.chain_monitor
3187                        .lock()
3188                        .await
3189                        .cleanup_channel(signed_channel.channel_id);
3190                    self.store.upsert_channel(closed_channel, None).await?;
3191                    true
3192                }
3193                TxType::CollaborativeClose => {
3194                    if let Some(SignedChannelState::Established {
3195                        signed_contract_id, ..
3196                    }) = signed_channel.roll_back_state
3197                    {
3198                        let counter_payout = get_signed_channel_state!(
3199                            signed_channel,
3200                            CollaborativeCloseOffered,
3201                            counter_payout
3202                        )?;
3203                        let closed_contract = self
3204                            .get_collaboratively_closed_contract(
3205                                &signed_contract_id,
3206                                *counter_payout,
3207                                false,
3208                            )
3209                            .await?;
3210                        self.store
3211                            .update_contract(&Contract::Closed(closed_contract))
3212                            .await?;
3213                    }
3214                    let closed_channel = Channel::CollaborativelyClosed(ClosedChannel {
3215                        counter_party: signed_channel.counter_party,
3216                        temporary_channel_id: signed_channel.temporary_channel_id,
3217                        channel_id: signed_channel.channel_id,
3218                    });
3219                    self.chain_monitor
3220                        .lock()
3221                        .await
3222                        .cleanup_channel(signed_channel.channel_id);
3223                    self.store.upsert_channel(closed_channel, None).await?;
3224                    true
3225                }
3226                TxType::SettleTx => {
3227                    let closed_channel = Channel::CounterClosed(ClosedChannel {
3228                        counter_party: signed_channel.counter_party,
3229                        temporary_channel_id: signed_channel.temporary_channel_id,
3230                        channel_id: signed_channel.channel_id,
3231                    });
3232                    self.chain_monitor
3233                        .lock()
3234                        .await
3235                        .cleanup_channel(signed_channel.channel_id);
3236                    self.store.upsert_channel(closed_channel, None).await?;
3237                    true
3238                }
3239                TxType::Cet => {
3240                    let contract_id = signed_channel.get_contract_id();
3241                    let closed_channel = {
3242                        match &signed_channel.state {
3243                            SignedChannelState::Closing { is_initiator, .. } => {
3244                                if *is_initiator {
3245                                    Channel::Closed(ClosedChannel {
3246                                        counter_party: signed_channel.counter_party,
3247                                        temporary_channel_id: signed_channel.temporary_channel_id,
3248                                        channel_id: signed_channel.channel_id,
3249                                    })
3250                                } else {
3251                                    Channel::CounterClosed(ClosedChannel {
3252                                        counter_party: signed_channel.counter_party,
3253                                        temporary_channel_id: signed_channel.temporary_channel_id,
3254                                        channel_id: signed_channel.channel_id,
3255                                    })
3256                                }
3257                            }
3258                            _ => {
3259                                log_error!(self.logger, "Saw spending of buffer transaction without being in closing state");
3260                                Channel::Closed(ClosedChannel {
3261                                    counter_party: signed_channel.counter_party,
3262                                    temporary_channel_id: signed_channel.temporary_channel_id,
3263                                    channel_id: signed_channel.channel_id,
3264                                })
3265                            }
3266                        }
3267                    };
3268
3269                    self.chain_monitor
3270                        .lock()
3271                        .await
3272                        .cleanup_channel(signed_channel.channel_id);
3273                    let pre_closed_contract = futures::stream::iter(contract_id)
3274                        .then(|contract_id| {
3275                            let txn = tx.clone();
3276                            async move {
3277                                self.store.get_contract(&contract_id).await.map(|contract| {
3278                                    contract.and_then(|contract| match contract {
3279                                        Contract::Confirmed(signed_contract) => {
3280                                            Some(Contract::PreClosed(PreClosedContract {
3281                                                signed_contract,
3282                                                attestations: None,
3283                                                signed_cet: txn,
3284                                            }))
3285                                        }
3286                                        _ => None,
3287                                    })
3288                                })
3289                            }
3290                        })
3291                        .try_collect::<Vec<_>>()
3292                        .await?
3293                        .into_iter()
3294                        .flatten()
3295                        .next();
3296
3297                    self.store
3298                        .upsert_channel(closed_channel, pre_closed_contract)
3299                        .await?;
3300
3301                    true
3302                }
3303            };
3304
3305            if persist {
3306                self.store
3307                    .persist_chain_monitor(&*self.chain_monitor.lock().await)
3308                    .await?;
3309            }
3310        }
3311        Ok(())
3312    }
3313
3314    async fn check_for_watched_tx(&self) -> Result<(), Error> {
3315        let confirmed_txs = self.chain_monitor.lock().await.confirmed_txs();
3316
3317        self.process_watched_txs(confirmed_txs).await?;
3318
3319        self.store
3320            .persist_chain_monitor(&*self.chain_monitor.lock().await)
3321            .await?;
3322
3323        Ok(())
3324    }
3325
3326    async fn force_close_channel_internal(
3327        &self,
3328        mut channel: SignedChannel,
3329        is_initiator: bool,
3330    ) -> Result<(), Error> {
3331        match &channel.state {
3332            SignedChannelState::Established {
3333                counter_buffer_adaptor_signature,
3334                buffer_transaction,
3335                ..
3336            } => {
3337                let counter_buffer_adaptor_signature = *counter_buffer_adaptor_signature;
3338                let buffer_transaction = buffer_transaction.clone();
3339                self.initiate_unilateral_close_established_channel(
3340                    channel,
3341                    is_initiator,
3342                    counter_buffer_adaptor_signature,
3343                    buffer_transaction,
3344                )
3345                .await
3346            }
3347            SignedChannelState::RenewFinalized {
3348                buffer_transaction,
3349                offer_buffer_adaptor_signature,
3350                ..
3351            } => {
3352                let offer_buffer_adaptor_signature = *offer_buffer_adaptor_signature;
3353                let buffer_transaction = buffer_transaction.clone();
3354                self.initiate_unilateral_close_established_channel(
3355                    channel,
3356                    is_initiator,
3357                    offer_buffer_adaptor_signature,
3358                    buffer_transaction,
3359                )
3360                .await
3361            }
3362            SignedChannelState::Settled { .. } => {
3363                self.close_settled_channel(channel, is_initiator).await
3364            }
3365            SignedChannelState::SettledOffered { .. }
3366            | SignedChannelState::SettledReceived { .. }
3367            | SignedChannelState::SettledAccepted { .. }
3368            | SignedChannelState::SettledConfirmed { .. }
3369            | SignedChannelState::RenewOffered { .. }
3370            | SignedChannelState::RenewAccepted { .. }
3371            | SignedChannelState::RenewConfirmed { .. }
3372            | SignedChannelState::CollaborativeCloseOffered { .. } => {
3373                channel.state = channel
3374                    .roll_back_state
3375                    .take()
3376                    .expect("to have a rollback state");
3377                let channel_clone = channel.clone(); // Clone the channel to avoid moving it
3378                Box::pin(self.force_close_channel_internal(channel_clone, is_initiator)).await
3379            }
3380            SignedChannelState::Closing { .. } => Err(Error::InvalidState(
3381                "Channel is already closing.".to_string(),
3382            )),
3383        }
3384    }
3385
3386    /// Initiate the unilateral closing of a channel that has been established.
3387    async fn initiate_unilateral_close_established_channel(
3388        &self,
3389        mut signed_channel: SignedChannel,
3390        is_initiator: bool,
3391        buffer_adaptor_signature: EcdsaAdaptorSignature,
3392        buffer_transaction: Transaction,
3393    ) -> Result<(), Error> {
3394        let keys_id = signed_channel.keys_id().ok_or_else(|| {
3395            Error::InvalidState("Expected to be in a state with an associated keys id.".to_string())
3396        })?;
3397
3398        crate::channel_updater::initiate_unilateral_close_established_channel(
3399            &self.secp,
3400            &mut signed_channel,
3401            buffer_adaptor_signature,
3402            keys_id,
3403            buffer_transaction,
3404            &self.signer_provider,
3405            is_initiator,
3406        )?;
3407
3408        let buffer_transaction =
3409            get_signed_channel_state!(signed_channel, Closing, ref buffer_transaction)?;
3410
3411        self.blockchain.send_transaction(buffer_transaction).await?;
3412
3413        self.chain_monitor
3414            .lock()
3415            .await
3416            .remove_tx(&buffer_transaction.compute_txid());
3417
3418        self.store
3419            .upsert_channel(Channel::Signed(signed_channel), None)
3420            .await?;
3421
3422        self.store
3423            .persist_chain_monitor(&*self.chain_monitor.lock().await)
3424            .await?;
3425
3426        Ok(())
3427    }
3428
3429    /// Unilaterally close a channel that has been settled.
3430    async fn close_settled_channel(
3431        &self,
3432        signed_channel: SignedChannel,
3433        is_initiator: bool,
3434    ) -> Result<(), Error> {
3435        let (settle_tx, closed_channel) = crate::channel_updater::close_settled_channel(
3436            &self.secp,
3437            &signed_channel,
3438            &self.signer_provider,
3439            is_initiator,
3440        )?;
3441
3442        if self
3443            .blockchain
3444            .get_transaction_confirmations(&settle_tx.compute_txid())
3445            .await
3446            .unwrap_or(0)
3447            == 0
3448        {
3449            self.blockchain.send_transaction(&settle_tx).await?;
3450        }
3451
3452        self.chain_monitor
3453            .lock()
3454            .await
3455            .cleanup_channel(signed_channel.channel_id);
3456
3457        self.store.upsert_channel(closed_channel, None).await?;
3458
3459        Ok(())
3460    }
3461
3462    async fn get_collaboratively_closed_contract(
3463        &self,
3464        contract_id: &ContractId,
3465        payout: Amount,
3466        is_own_payout: bool,
3467    ) -> Result<ClosedContract, Error> {
3468        let contract = get_contract_in_state!(self, contract_id, Confirmed, None::<PublicKey>)?;
3469        let own_collateral = if contract.accepted_contract.offered_contract.is_offer_party {
3470            contract
3471                .accepted_contract
3472                .offered_contract
3473                .offer_params
3474                .collateral
3475        } else {
3476            contract.accepted_contract.accept_params.collateral
3477        };
3478        let own_payout = if is_own_payout {
3479            payout
3480        } else {
3481            contract.accepted_contract.offered_contract.total_collateral - payout
3482        };
3483        let pnl =
3484            SignedAmount::from_sat(own_payout.to_sat() as i64 - own_collateral.to_sat() as i64);
3485        Ok(ClosedContract {
3486            attestations: None,
3487            signed_cet: None,
3488            contract_id: *contract_id,
3489            temporary_contract_id: contract.accepted_contract.offered_contract.id,
3490            counter_party_id: contract.accepted_contract.offered_contract.counter_party,
3491            funding_txid: contract
3492                .accepted_contract
3493                .dlc_transactions
3494                .fund
3495                .compute_txid(),
3496            pnl,
3497            signed_contract: contract.clone(),
3498        })
3499    }
3500
3501    async fn oracle_announcements(
3502        &self,
3503        contract_input: &ContractInput,
3504    ) -> Result<Vec<Vec<OracleAnnouncement>>, Error> {
3505        let announcements = stream::iter(contract_input.contract_infos.iter())
3506            .map(|x| {
3507                let future = self.get_oracle_announcements(&x.oracles);
3508                async move {
3509                    match future.await {
3510                        Ok(result) => Ok(result),
3511                        Err(e) => {
3512                            log_error!(self.logger, "Failed to get oracle announcements: {}", e);
3513                            Err(e)
3514                        }
3515                    }
3516                }
3517            })
3518            .collect::<FuturesUnordered<_>>()
3519            .await
3520            .try_collect::<Vec<_>>()
3521            .await?;
3522        Ok(announcements)
3523    }
3524}