ddk_manager/
manager.rs

1//! #Manager a component to create and update DLCs.
2
3use super::{
4    Blockchain, CachedContractSignerProvider, ContractSigner, Oracle, Storage, Time, Wallet,
5};
6use crate::chain_monitor::{ChainMonitor, ChannelInfo, RevokedTxType, TxType};
7use crate::channel::offered_channel::OfferedChannel;
8use crate::channel::signed_channel::{SignedChannel, SignedChannelState, SignedChannelStateType};
9use crate::channel::{Channel, ClosedChannel, ClosedPunishedChannel};
10use crate::channel_updater::get_signed_channel_state;
11use crate::channel_updater::verify_signed_channel;
12use crate::contract::{
13    accepted_contract::AcceptedContract, contract_info::ContractInfo,
14    contract_input::ContractInput, contract_input::OracleInput, offered_contract::OfferedContract,
15    signed_contract::SignedContract, AdaptorInfo, ClosedContract, Contract, FailedAcceptContract,
16    FailedSignContract, PreClosedContract,
17};
18use crate::contract_updater::{accept_contract, verify_accepted_and_sign_contract};
19use crate::error::Error;
20use crate::utils::get_object_in_state;
21use crate::{ChannelId, ContractId, ContractSignerProvider};
22use bitcoin::absolute::Height;
23use bitcoin::consensus::encode::serialize_hex;
24use bitcoin::consensus::Decodable;
25use bitcoin::hex::DisplayHex;
26use bitcoin::{Address, Amount, SignedAmount};
27use bitcoin::{OutPoint, Transaction};
28use ddk_messages::channel::{
29    AcceptChannel, CollaborativeCloseOffer, OfferChannel, Reject, RenewAccept, RenewConfirm,
30    RenewFinalize, RenewOffer, RenewRevoke, SettleAccept, SettleConfirm, SettleFinalize,
31    SettleOffer, SignChannel,
32};
33use ddk_messages::oracle_msgs::{OracleAnnouncement, OracleAttestation};
34use ddk_messages::{AcceptDlc, CloseDlc, Message as DlcMessage, OfferDlc, SignDlc};
35use futures::stream;
36use futures::stream::FuturesUnordered;
37use futures::{StreamExt, TryStreamExt};
38use lightning::chain::chaininterface::FeeEstimator;
39use lightning::ln::chan_utils::{
40    build_commitment_secret, derive_private_key, derive_private_revocation_key,
41};
42use lightning::util::logger::Logger;
43use lightning::{log_debug, log_error, log_info, log_trace, log_warn};
44use secp256k1_zkp::XOnlyPublicKey;
45use secp256k1_zkp::{
46    ecdsa::Signature, All, EcdsaAdaptorSignature, PublicKey, Secp256k1, SecretKey,
47};
48use std::collections::HashMap;
49use std::ops::Deref;
50use std::string::ToString;
51use std::sync::Arc;
52use tokio::sync::Mutex;
53
54/// The number of confirmations required before moving the the confirmed state.
55pub const NB_CONFIRMATIONS: u32 = 6;
56/// The delay to set the refund value to.
57pub const REFUND_DELAY: u32 = 86400 * 7;
58/// The nSequence value used for CETs in DLC channels
59pub const CET_NSEQUENCE: u32 = 288;
60/// Timeout in seconds when waiting for a peer's reply, after which a DLC channel
61/// is forced closed.
62pub const PEER_TIMEOUT: u64 = 3600;
63
64type ClosableContractInfo<'a> = Option<(
65    &'a ContractInfo,
66    &'a AdaptorInfo,
67    Vec<(usize, OracleAttestation)>,
68)>;
69
70/// Used to create and update DLCs.
71#[derive(Debug)]
72pub struct Manager<
73    W: Deref,
74    SP: Deref,
75    B: Deref,
76    S: Deref,
77    O: Deref,
78    T: Deref,
79    F: Deref,
80    X: ContractSigner,
81    L: Deref,
82> where
83    W::Target: Wallet,
84    SP::Target: ContractSignerProvider<Signer = X>,
85    B::Target: Blockchain,
86    S::Target: Storage,
87    O::Target: Oracle,
88    T::Target: Time,
89    F::Target: FeeEstimator,
90    L::Target: Logger,
91{
92    oracles: HashMap<XOnlyPublicKey, O>,
93    wallet: W,
94    signer_provider: SP,
95    blockchain: B,
96    store: S,
97    secp: Secp256k1<All>,
98    chain_monitor: Mutex<ChainMonitor>,
99    time: T,
100    fee_estimator: F,
101    logger: L,
102}
103
104macro_rules! get_contract_in_state {
105    ($manager: ident, $contract_id: expr, $state: ident, $peer_id: expr) => {{
106        get_object_in_state!(
107            $manager,
108            $contract_id,
109            $state,
110            $peer_id,
111            Contract,
112            get_contract
113        )
114    }};
115}
116
117macro_rules! get_channel_in_state {
118    ($manager: ident, $channel_id: expr, $state: ident, $peer_id: expr) => {{
119        get_object_in_state!(
120            $manager,
121            $channel_id,
122            $state,
123            $peer_id,
124            Channel,
125            get_channel
126        )
127    }};
128}
129
130macro_rules! get_signed_channel_rollback_state {
131    ($signed_channel: ident, $state: ident, $($field: ident),*) => {{
132       match $signed_channel.roll_back_state.as_ref() {
133           Some(SignedChannelState::$state{$($field,)* ..}) => Ok(($($field,)*)),
134           _ => Err(Error::InvalidState(format!("Expected rollback state {} got {:?}", stringify!($state), $signed_channel.state))),
135        }
136    }};
137}
138
139macro_rules! check_for_timed_out_channels {
140    ($manager: ident, $state: ident) => {
141        let channels = $manager
142            .store
143            .get_signed_channels(Some(SignedChannelStateType::$state))
144            .await?;
145
146        for channel in channels {
147            if let SignedChannelState::$state { timeout, .. } = channel.state {
148                let is_timed_out = timeout < $manager.time.unix_time_now();
149                if is_timed_out {
150                    match $manager.force_close_channel_internal(channel, true).await {
151                        Err(e) => {
152                            log_error!($manager.logger, "Error force closing channel. error={}", e)
153                        }
154                        _ => {}
155                    }
156                }
157            }
158        }
159    };
160}
161
162impl<
163        W: Deref,
164        SP: Deref,
165        B: Deref,
166        S: Deref,
167        O: Deref,
168        T: Deref,
169        F: Deref,
170        X: ContractSigner,
171        L: Deref,
172    > Manager<W, Arc<CachedContractSignerProvider<SP, X>>, B, S, O, T, F, X, L>
173where
174    W::Target: Wallet,
175    SP::Target: ContractSignerProvider<Signer = X>,
176    B::Target: Blockchain,
177    S::Target: Storage,
178    O::Target: Oracle,
179    T::Target: Time,
180    F::Target: FeeEstimator,
181    L::Target: Logger,
182{
183    /// Create a new Manager struct.
184    #[allow(clippy::too_many_arguments)]
185    #[tracing::instrument(skip_all)]
186    pub async fn new(
187        wallet: W,
188        signer_provider: SP,
189        blockchain: B,
190        store: S,
191        oracles: HashMap<XOnlyPublicKey, O>,
192        time: T,
193        fee_estimator: F,
194        logger: L,
195    ) -> Result<Self, Error> {
196        let init_height = blockchain.get_blockchain_height().await?;
197        let chain_monitor = Mutex::new(
198            store
199                .get_chain_monitor()
200                .await?
201                .unwrap_or(ChainMonitor::new(init_height)),
202        );
203
204        let signer_provider = Arc::new(CachedContractSignerProvider::new(signer_provider));
205
206        log_info!(logger, "Manager initialized");
207
208        Ok(Manager {
209            secp: secp256k1_zkp::Secp256k1::new(),
210            wallet,
211            signer_provider,
212            blockchain,
213            store,
214            oracles,
215            time,
216            fee_estimator,
217            chain_monitor,
218            logger,
219        })
220    }
221
222    /// Get the store from the Manager to access contracts.
223    pub fn get_store(&self) -> &S {
224        &self.store
225    }
226
227    /// Function called to pass a DlcMessage to the Manager.
228    #[tracing::instrument(skip_all)]
229    pub async fn on_dlc_message(
230        &self,
231        msg: &DlcMessage,
232        counter_party: PublicKey,
233    ) -> Result<Option<DlcMessage>, Error> {
234        match msg {
235            DlcMessage::Offer(o) => {
236                log_debug!(self.logger, "Received offer message");
237                self.on_offer_message(o, counter_party).await?;
238                Ok(None)
239            }
240            DlcMessage::Accept(a) => {
241                log_debug!(self.logger, "Received accept message");
242                Ok(Some(self.on_accept_message(a, &counter_party).await?))
243            }
244            DlcMessage::Sign(s) => {
245                log_debug!(self.logger, "Received sign message");
246                self.on_sign_message(s, &counter_party).await?;
247                Ok(None)
248            }
249            DlcMessage::Close(c) => {
250                log_debug!(self.logger, "Received close message");
251                self.on_close_message(c, &counter_party).await?;
252                Ok(None)
253            }
254            DlcMessage::OfferChannel(o) => {
255                log_debug!(self.logger, "Received offer channel message");
256                self.on_offer_channel(o, counter_party).await?;
257                Ok(None)
258            }
259            DlcMessage::AcceptChannel(a) => {
260                log_debug!(self.logger, "Received accept channel message");
261                Ok(Some(DlcMessage::SignChannel(
262                    self.on_accept_channel(a, &counter_party).await?,
263                )))
264            }
265            DlcMessage::SignChannel(s) => {
266                log_debug!(self.logger, "Received sign channel message");
267                self.on_sign_channel(s, &counter_party).await?;
268                Ok(None)
269            }
270            DlcMessage::SettleOffer(s) => {
271                log_debug!(self.logger, "Received settle offer message");
272                match self.on_settle_offer(s, &counter_party).await? {
273                    Some(msg) => Ok(Some(DlcMessage::Reject(msg))),
274                    None => Ok(None),
275                }
276            }
277            DlcMessage::SettleAccept(s) => {
278                log_debug!(self.logger, "Received settle accept message");
279                Ok(Some(DlcMessage::SettleConfirm(
280                    self.on_settle_accept(s, &counter_party).await?,
281                )))
282            }
283            DlcMessage::SettleConfirm(s) => {
284                log_debug!(self.logger, "Received settle confirm message");
285                Ok(Some(DlcMessage::SettleFinalize(
286                    self.on_settle_confirm(s, &counter_party).await?,
287                )))
288            }
289            DlcMessage::SettleFinalize(s) => {
290                log_debug!(self.logger, "Received settle finalize message");
291                self.on_settle_finalize(s, &counter_party).await?;
292                Ok(None)
293            }
294            DlcMessage::RenewOffer(r) => {
295                log_debug!(self.logger, "Received renew offer message");
296                match self.on_renew_offer(r, &counter_party).await? {
297                    Some(msg) => Ok(Some(DlcMessage::Reject(msg))),
298                    None => Ok(None),
299                }
300            }
301            DlcMessage::RenewAccept(r) => {
302                log_debug!(self.logger, "Received renew accept message");
303                Ok(Some(DlcMessage::RenewConfirm(
304                    self.on_renew_accept(r, &counter_party).await?,
305                )))
306            }
307            DlcMessage::RenewConfirm(r) => {
308                log_debug!(self.logger, "Received renew confirm message");
309                Ok(Some(DlcMessage::RenewFinalize(
310                    self.on_renew_confirm(r, &counter_party).await?,
311                )))
312            }
313            DlcMessage::RenewFinalize(r) => {
314                log_debug!(self.logger, "Received renew finalize message");
315                let revoke = self.on_renew_finalize(r, &counter_party).await?;
316                Ok(Some(DlcMessage::RenewRevoke(revoke)))
317            }
318            DlcMessage::RenewRevoke(r) => {
319                log_debug!(self.logger, "Received renew revoke message");
320                self.on_renew_revoke(r, &counter_party).await?;
321                Ok(None)
322            }
323            DlcMessage::CollaborativeCloseOffer(c) => {
324                log_debug!(self.logger, "Received collaborative close offer message");
325                self.on_collaborative_close_offer(c, &counter_party).await?;
326                Ok(None)
327            }
328            DlcMessage::Reject(r) => {
329                log_debug!(self.logger, "Received reject message");
330                self.on_reject(r, &counter_party).await?;
331                Ok(None)
332            }
333        }
334    }
335
336    /// Create a new spliced offer
337    #[tracing::instrument(skip_all)]
338    pub async fn send_splice_offer(
339        &self,
340        contract_input: &ContractInput,
341        counter_party: PublicKey,
342        contract_id: &ContractId,
343    ) -> Result<OfferDlc, Error> {
344        let oracle_announcements = self.oracle_announcements(contract_input).await?;
345
346        self.send_splice_offer_with_announcements(
347            contract_input,
348            counter_party,
349            contract_id,
350            oracle_announcements,
351        )
352        .await
353    }
354
355    /// Creates a new offer DLC using an existing DLC as an input.
356    /// The new DLC MUST use a Confirmed contract as an input.
357    #[tracing::instrument(skip_all)]
358    pub async fn send_splice_offer_with_announcements(
359        &self,
360        contract_input: &ContractInput,
361        counter_party: PublicKey,
362        contract_id: &ContractId,
363        oracle_announcements: Vec<Vec<OracleAnnouncement>>,
364    ) -> Result<OfferDlc, Error> {
365        let confirmed_contract =
366            get_contract_in_state!(self, contract_id, Confirmed, Some(counter_party))?;
367
368        let dlc_input = confirmed_contract.get_dlc_input();
369
370        let (offered_contract, offer_msg) = crate::contract_updater::offer_contract(
371            &self.secp,
372            contract_input,
373            oracle_announcements,
374            vec![dlc_input],
375            REFUND_DELAY,
376            &counter_party,
377            &self.wallet,
378            &self.blockchain,
379            &self.time,
380            &self.signer_provider,
381            &self.logger,
382        )
383        .await?;
384
385        offered_contract.validate()?;
386
387        self.store.create_contract(&offered_contract).await?;
388
389        Ok(offer_msg)
390    }
391
392    /// Function called to create a new DLC. The offered contract will be stored
393    /// and an OfferDlc message returned.
394    ///
395    /// This function will fetch the oracle announcements from the oracle.
396    #[tracing::instrument(skip_all)]
397    pub async fn send_offer(
398        &self,
399        contract_input: &ContractInput,
400        counter_party: PublicKey,
401    ) -> Result<OfferDlc, Error> {
402        // If the oracle announcement fails to retrieve, then log and continue.
403        let oracle_announcements = self.oracle_announcements(contract_input).await?;
404
405        self.send_offer_with_announcements(contract_input, counter_party, oracle_announcements)
406            .await
407    }
408
409    /// Function called to create a new DLC. The offered contract will be stored
410    /// and an OfferDlc message returned.
411    ///
412    /// This function allows to pass the oracle announcements directly instead of
413    /// fetching them from the oracle.
414    #[tracing::instrument(skip_all)]
415    pub async fn send_offer_with_announcements(
416        &self,
417        contract_input: &ContractInput,
418        counter_party: PublicKey,
419        oracle_announcements: Vec<Vec<OracleAnnouncement>>,
420    ) -> Result<OfferDlc, Error> {
421        let (offered_contract, offer_msg) = crate::contract_updater::offer_contract(
422            &self.secp,
423            contract_input,
424            oracle_announcements,
425            vec![],
426            REFUND_DELAY,
427            &counter_party,
428            &self.wallet,
429            &self.blockchain,
430            &self.time,
431            &self.signer_provider,
432            &self.logger,
433        )
434        .await?;
435
436        offered_contract.validate()?;
437
438        self.store.create_contract(&offered_contract).await?;
439
440        Ok(offer_msg)
441    }
442
443    /// Function to call to accept a DLC for which an offer was received.
444    #[tracing::instrument(skip_all)]
445    pub async fn accept_contract_offer(
446        &self,
447        contract_id: &ContractId,
448    ) -> Result<(ContractId, PublicKey, AcceptDlc), Error> {
449        let offered_contract =
450            get_contract_in_state!(self, contract_id, Offered, None as Option<PublicKey>)?;
451
452        let counter_party = offered_contract.counter_party;
453
454        let (accepted_contract, accept_msg) = accept_contract(
455            &self.secp,
456            &offered_contract,
457            &self.wallet,
458            &self.signer_provider,
459            &self.blockchain,
460            &self.logger,
461        )
462        .await?;
463
464        self.wallet.import_address(&Address::p2wsh(
465            &accepted_contract.dlc_transactions.funding_script_pubkey,
466            self.blockchain.get_network()?,
467        ))?;
468
469        let contract_id = accepted_contract.get_contract_id();
470
471        self.store
472            .update_contract(&Contract::Accepted(accepted_contract))
473            .await?;
474
475        log_info!(
476            self.logger,
477            "Accepted and stored the contract. temp_id={} contract_id={}",
478            offered_contract.id.to_lower_hex_string(),
479            contract_id.to_lower_hex_string(),
480        );
481
482        Ok((contract_id, counter_party, accept_msg))
483    }
484
485    /// Function to update the state of the [`ChainMonitor`] with new
486    /// blocks.
487    ///
488    /// Consumers **MUST** call this periodically in order to
489    /// determine when pending transactions reach confirmation.
490    #[tracing::instrument(skip_all)]
491    pub async fn periodic_chain_monitor(&self) -> Result<(), Error> {
492        let cur_height = self.blockchain.get_blockchain_height().await?;
493        let last_height = self.chain_monitor.lock().await.last_height;
494
495        // TODO(luckysori): We could end up reprocessing a block at
496        // the same height if there is a reorg.
497        if cur_height < last_height {
498            return Err(Error::InvalidState(
499                "Current height is lower than last height.".to_string(),
500            ));
501        }
502
503        for height in last_height + 1..=cur_height {
504            let block = self.blockchain.get_block_at_height(height).await?;
505
506            self.chain_monitor
507                .lock()
508                .await
509                .process_block(&block, height);
510        }
511
512        Ok(())
513    }
514
515    /// Function to call to check the state of the currently executing DLCs and
516    /// update them if possible.
517    #[tracing::instrument(skip_all, level = "debug")]
518    pub async fn periodic_check(&self, check_channels: bool) -> Result<(), Error> {
519        let signed_contracts = self.check_for_spliced_contract().await?;
520        self.check_signed_contracts(&signed_contracts).await?;
521        self.check_confirmed_contracts().await?;
522        self.check_preclosed_contracts().await?;
523
524        if check_channels {
525            self.channel_checks().await?;
526        }
527
528        Ok(())
529    }
530
531    /// Function to call to offer a DLC.
532    #[tracing::instrument(skip_all)]
533    pub async fn on_offer_message(
534        &self,
535        offered_message: &OfferDlc,
536        counter_party: PublicKey,
537    ) -> Result<(), Error> {
538        offered_message.validate(&self.secp, REFUND_DELAY, REFUND_DELAY * 2)?;
539        let keys_id = self
540            .signer_provider
541            .derive_signer_key_id(false, offered_message.temporary_contract_id);
542        let contract: OfferedContract =
543            OfferedContract::try_from_offer_dlc(offered_message, counter_party, keys_id)?;
544        contract.validate()?;
545
546        if self.store.get_contract(&contract.id).await?.is_some() {
547            return Err(Error::InvalidParameters(
548                "Contract with identical id already exists".to_string(),
549            ));
550        }
551
552        self.store.create_contract(&contract).await?;
553        log_info!(
554            self.logger,
555            "Created and stored the offered contract. temp_id={}",
556            contract.id.to_lower_hex_string(),
557        );
558        Ok(())
559    }
560
561    /// Function to call to close a DLC.
562    #[tracing::instrument(skip_all)]
563    pub async fn on_close_message(
564        &self,
565        close_msg: &CloseDlc,
566        counter_party: &PublicKey,
567    ) -> Result<(), Error> {
568        // Validate that the contract exists and is in the correct state
569        let signed_contract = get_contract_in_state!(
570            self,
571            &close_msg.contract_id,
572            Confirmed,
573            Some(*counter_party)
574        )?;
575
576        // Validate the close message by attempting to construct the close transaction
577        // This verifies the signature and transaction structure without broadcasting
578        let _close_tx = crate::contract_updater::complete_cooperative_close(
579            &self.secp,
580            &signed_contract,
581            close_msg,
582            &self.signer_provider,
583            &self.logger,
584        )?;
585
586        // Message is valid - the application layer should call accept_cooperative_close()
587        // if they want to accept the offered terms
588        Ok(())
589    }
590
591    /// Function to call to accept a DLC for which an offer was received.
592    #[tracing::instrument(skip_all)]
593    pub async fn on_accept_message(
594        &self,
595        accept_msg: &AcceptDlc,
596        counter_party: &PublicKey,
597    ) -> Result<DlcMessage, Error> {
598        let offered_contract = get_contract_in_state!(
599            self,
600            &accept_msg.temporary_contract_id,
601            Offered,
602            Some(*counter_party)
603        )?;
604
605        let (signed_contract, signed_msg) = match verify_accepted_and_sign_contract(
606            &self.secp,
607            &offered_contract,
608            accept_msg,
609            &self.wallet,
610            &self.signer_provider,
611            &self.store,
612            &self.logger,
613        )
614        .await
615        {
616            Ok(contract) => contract,
617            Err(e) => {
618                log_error!(
619                    self.logger,
620                    "Error in on_accept_message. tmp_contract_id={} error={}",
621                    offered_contract.id.to_lower_hex_string(),
622                    e.to_string()
623                );
624                return self
625                    .accept_fail_on_error(offered_contract, accept_msg.clone(), e)
626                    .await;
627            }
628        };
629
630        log_info!(
631            self.logger,
632            "Verified the accept message and signed the contract. temp_id={} contract_id={}",
633            offered_contract.id.to_lower_hex_string(),
634            signed_contract.accepted_contract.get_contract_id_string(),
635        );
636
637        let contract_id = signed_contract.accepted_contract.get_contract_id_string();
638
639        self.wallet.import_address(&Address::p2wsh(
640            &signed_contract
641                .accepted_contract
642                .dlc_transactions
643                .funding_script_pubkey,
644            self.blockchain.get_network()?,
645        ))?;
646
647        self.store
648            .update_contract(&Contract::Signed(signed_contract))
649            .await?;
650
651        log_info!(
652            self.logger,
653            "Accepted and signed the contract. temp_id={} contract_id={}",
654            offered_contract.id.to_lower_hex_string(),
655            contract_id,
656        );
657
658        Ok(DlcMessage::Sign(signed_msg))
659    }
660
661    /// Function to call to sign a DLC for which an accept was received.
662    #[tracing::instrument(skip_all)]
663    pub async fn on_sign_message(
664        &self,
665        sign_message: &SignDlc,
666        peer_id: &PublicKey,
667    ) -> Result<(), Error> {
668        let accepted_contract =
669            get_contract_in_state!(self, &sign_message.contract_id, Accepted, Some(*peer_id))?;
670        let (signed_contract, fund_tx) = match crate::contract_updater::verify_signed_contract(
671            &self.secp,
672            &accepted_contract,
673            sign_message,
674            &self.wallet,
675            &self.store,
676            &self.signer_provider,
677            &self.logger,
678        )
679        .await
680        {
681            Ok(contract) => contract,
682            Err(e) => {
683                log_error!(
684                    self.logger,
685                    "Error in on_sign_message. contract_id={} error={}",
686                    accepted_contract.get_contract_id_string(),
687                    e.to_string()
688                );
689                return self
690                    .sign_fail_on_error(accepted_contract, sign_message.clone(), e)
691                    .await;
692            }
693        };
694
695        self.store
696            .update_contract(&Contract::Signed(signed_contract.clone()))
697            .await?;
698
699        self.blockchain.send_transaction(&fund_tx).await?;
700
701        // Check if there are any DLC inputs in the funding inputs of the contract.
702        // If there are, mark the contract as pre-closed.
703        if accepted_contract
704            .offered_contract
705            .funding_inputs
706            .iter()
707            .any(|c| c.dlc_input.is_some())
708        {
709            let Some(dlc_input) = accepted_contract
710                .offered_contract
711                .funding_inputs
712                .iter()
713                .find(|c| c.dlc_input.is_some())
714            else {
715                return Ok(());
716            };
717            let preclosed_contract = get_contract_in_state!(
718                self,
719                &dlc_input.dlc_input.as_ref().unwrap().contract_id,
720                Confirmed,
721                None as Option<PublicKey>
722            )?;
723
724            let preclosed_contract = PreClosedContract {
725                signed_contract: preclosed_contract.clone(),
726                attestations: None,
727                signed_cet: signed_contract
728                    .accepted_contract
729                    .dlc_transactions
730                    .fund
731                    .clone(),
732            };
733
734            log_debug!(self.logger,
735                "Contract contains a DLC input. Marking the previous contract as pre-closed. dlc_input_contract_id={}", 
736                preclosed_contract.signed_contract.accepted_contract.get_contract_id_string()
737            );
738
739            self.store
740                .update_contract(&Contract::PreClosed(preclosed_contract.clone()))
741                .await?;
742        }
743
744        Ok(())
745    }
746
747    #[tracing::instrument(skip_all, level = "debug")]
748    async fn get_oracle_announcements(
749        &self,
750        oracle_inputs: &OracleInput,
751    ) -> Result<Vec<OracleAnnouncement>, Error> {
752        let mut announcements = Vec::new();
753        for pubkey in &oracle_inputs.public_keys {
754            let oracle = self
755                .oracles
756                .get(pubkey)
757                .ok_or_else(|| Error::InvalidParameters("Unknown oracle public key".to_string()))?;
758            let announcement = oracle.get_announcement(&oracle_inputs.event_id).await?;
759            announcements.push(announcement);
760        }
761
762        Ok(announcements)
763    }
764
765    async fn sign_fail_on_error<R>(
766        &self,
767        accepted_contract: AcceptedContract,
768        sign_message: SignDlc,
769        e: Error,
770    ) -> Result<R, Error> {
771        log_error!(
772            self.logger,
773            "Error in on_sign_message marking contract as failed sign. contract_id={} error={}",
774            accepted_contract.get_contract_id_string(),
775            e.to_string()
776        );
777        self.store
778            .update_contract(&Contract::FailedSign(FailedSignContract {
779                accepted_contract,
780                sign_message,
781                error_message: e.to_string(),
782            }))
783            .await?;
784        Err(e)
785    }
786
787    async fn accept_fail_on_error<R>(
788        &self,
789        offered_contract: OfferedContract,
790        accept_message: AcceptDlc,
791        e: Error,
792    ) -> Result<R, Error> {
793        log_error!(
794            self.logger,
795            "Error in on_accept_message marking contract as failed accept. contract_id={} error={}",
796            offered_contract.id.to_lower_hex_string(),
797            e.to_string()
798        );
799        self.store
800            .update_contract(&Contract::FailedAccept(FailedAcceptContract {
801                offered_contract,
802                accept_message,
803                error_message: e.to_string(),
804            }))
805            .await?;
806        Err(e)
807    }
808
809    #[tracing::instrument(skip_all, level = "debug")]
810    async fn check_signed_contract(&self, contract: &SignedContract) -> Result<(), Error> {
811        let confirmations = self
812            .blockchain
813            .get_transaction_confirmations(
814                &contract
815                    .accepted_contract
816                    .dlc_transactions
817                    .fund
818                    .compute_txid(),
819            )
820            .await?;
821        if confirmations >= NB_CONFIRMATIONS {
822            log_info!(
823                self.logger,
824                "Marking signed contract as confirmed. confirmations={} contract_id={}",
825                confirmations,
826                contract.accepted_contract.get_contract_id_string(),
827            );
828            self.store
829                .update_contract(&Contract::Confirmed(contract.clone()))
830                .await?;
831        } else {
832            log_debug!(self.logger,
833                "Not enough confirmations to mark contract as confirmed. confirmations={} required={} contract_id={}", 
834                confirmations,
835                NB_CONFIRMATIONS,
836                contract.accepted_contract.get_contract_id_string(),
837            );
838        }
839        Ok(())
840    }
841
842    #[tracing::instrument(skip_all, level = "debug")]
843    async fn check_signed_contracts(
844        &self,
845        signed_contracts: &[SignedContract],
846    ) -> Result<(), Error> {
847        for c in signed_contracts {
848            if let Err(e) = self.check_signed_contract(c).await {
849                log_error!(
850                    self.logger,
851                    "Error checking signed contract. contract_id={} error={}",
852                    c.accepted_contract.get_contract_id_string(),
853                    e.to_string()
854                )
855            }
856        }
857
858        Ok(())
859    }
860
861    #[tracing::instrument(skip_all, level = "debug")]
862    async fn check_for_spliced_contract(&self) -> Result<Vec<SignedContract>, Error> {
863        let contracts = self.get_store().get_signed_contracts().await?;
864        for contract in &contracts {
865            let dlc_input = contract
866                .accepted_contract
867                .offered_contract
868                .funding_inputs
869                .iter()
870                .find(|d| d.dlc_input.is_some());
871
872            let Some(funding_input) = dlc_input else {
873                continue;
874            };
875
876            let contract_id = funding_input.dlc_input.as_ref().unwrap().contract_id;
877
878            let confirmed_contract_in_splice = match get_contract_in_state!(
879                self,
880                &contract_id,
881                Confirmed,
882                None as Option<PublicKey>
883            ) {
884                Ok(c) => Ok(c),
885                Err(Error::InvalidState(e)) => {
886                    log_trace!(self.logger,
887                        "The previouse contract referenced in a splice transaction is in an unexpected state. contract_id={} error={}", 
888                        contract_id.to_lower_hex_string(), e.to_string(),
889                    );
890                    continue;
891                }
892                Err(e) => {
893                    log_debug!(self.logger,
894                        "The previouse contract referenced in a splice transaction failed to retrieve. contract_id={} error={}", 
895                        contract_id.to_lower_hex_string(), e.to_string(),
896                    );
897                    Err(e)
898                }
899            }?;
900
901            let preclosed_contract = PreClosedContract {
902                signed_contract: confirmed_contract_in_splice.clone(),
903                attestations: None,
904                signed_cet: contract.accepted_contract.dlc_transactions.fund.clone(),
905            };
906
907            log_debug!(self.logger,
908                "The previous contract that was spliced is now closed because the splice contract is confirmed. contract_id={}", 
909                contract_id.to_lower_hex_string(),
910            );
911
912            self.store
913                .update_contract(&Contract::PreClosed(preclosed_contract.clone()))
914                .await?;
915        }
916        Ok(contracts)
917    }
918
919    #[tracing::instrument(skip_all, level = "debug")]
920    async fn check_confirmed_contracts(&self) -> Result<(), Error> {
921        for c in self.store.get_confirmed_contracts().await? {
922            // Confirmed contracts from channel are processed in channel specific methods.
923            if c.channel_id.is_some() {
924                continue;
925            }
926            if let Err(e) = self.check_confirmed_contract(&c).await {
927                log_error!(
928                    self.logger,
929                    "Error checking confirmed contract. contract_id={} error={}",
930                    c.accepted_contract.get_contract_id_string(),
931                    e.to_string()
932                )
933            }
934        }
935
936        Ok(())
937    }
938
939    #[tracing::instrument(skip_all, level = "debug")]
940    async fn get_closable_contract_info<'a>(
941        &'a self,
942        contract: &'a SignedContract,
943    ) -> ClosableContractInfo<'a> {
944        let contract_infos = &contract.accepted_contract.offered_contract.contract_info;
945        let adaptor_infos = &contract.accepted_contract.adaptor_infos;
946        for (contract_info, adaptor_info) in contract_infos.iter().zip(adaptor_infos.iter()) {
947            log_debug!(
948                self.logger,
949                "Checking contract for oracle maturation. contract_id={}",
950                contract.accepted_contract.get_contract_id_string()
951            );
952            let matured: Vec<_> = contract_info
953                .oracle_announcements
954                .iter()
955                .filter(|x| {
956                    (x.oracle_event.event_maturity_epoch as u64) <= self.time.unix_time_now()
957                })
958                .enumerate()
959                .collect();
960            if matured.len() >= contract_info.threshold {
961                let attestations = stream::iter(matured.iter())
962                    .map(|(i, announcement)| async move {
963                        log_debug!(self.logger,
964                            "Oracle announcement for contract is matured. Getting attestations. contract_id={} event_id={}", 
965                            contract.accepted_contract.get_contract_id_string(),
966                            announcement.oracle_event.event_id
967                        );
968                        // First try to get the oracle
969                        let oracle = match self.oracles.get(&announcement.oracle_public_key) {
970                            Some(oracle) => oracle,
971                            None => {
972                                log_debug!(self.logger,
973                                    "Oracle not found. pubkey={}. contract_id={} event_id={}", 
974                                    announcement.oracle_public_key,
975                                    contract.accepted_contract.get_contract_id_string(),
976                                    announcement.oracle_event.event_id
977                                );
978                                return None;
979                            }
980                        };
981                        // Then try to get the attestation
982                        let attestation = match oracle
983                            .get_attestation(&announcement.oracle_event.event_id)
984                            .await
985                        {
986                            Ok(attestation) => attestation,
987                            Err(e) => {
988                                log_error!(self.logger,
989                                    "Attestation not found for event. pubkey={} event_id={} error={}",
990                                    announcement.oracle_public_key,
991                                    announcement.oracle_event.event_id,
992                                    e.to_string()
993                                );
994                                return None;
995                            }
996                        };
997                        // Validate the attestation
998                        if let Err(e) = attestation.validate(&self.secp, announcement) {
999                            log_error!(self.logger,
1000                                "Oracle attestation is not valid. pubkey={} event_id={}, error={}",
1001                                announcement.oracle_public_key,
1002                                announcement.oracle_event.event_id,
1003                                e.to_string()
1004                            );
1005                            return None;
1006                        }
1007                        log_info!(self.logger,
1008                            "Retrieved a valid attestation. pubkey={} event_id={} outcomes={:?}", 
1009                            announcement.oracle_public_key,
1010                            announcement.oracle_event.event_id,
1011                            attestation.outcomes
1012                        );
1013                        Some((*i, attestation))
1014                    })
1015                    .collect::<FuturesUnordered<_>>()
1016                    .await
1017                    .filter_map(|result| async move { result }) // Filter out None values
1018                    .collect::<Vec<_>>()
1019                    .await;
1020                if attestations.len() >= contract_info.threshold {
1021                    log_info!(self.logger,
1022                        "Found enough attestations to close contract. contract_id={} attestations={}", 
1023                        contract.accepted_contract.get_contract_id_string(),
1024                        attestations.len()
1025                    );
1026                    return Some((contract_info, adaptor_info, attestations));
1027                }
1028            }
1029        }
1030        None
1031    }
1032
1033    #[tracing::instrument(skip_all, level = "debug")]
1034    async fn check_confirmed_contract(&self, contract: &SignedContract) -> Result<(), Error> {
1035        log_debug!(
1036            self.logger,
1037            "Checking confirmed contract. contract_id={}",
1038            contract.accepted_contract.get_contract_id_string()
1039        );
1040        let closable_contract_info = self.get_closable_contract_info(contract).await;
1041        if let Some((contract_info, adaptor_info, attestations)) = closable_contract_info {
1042            log_debug!(
1043                self.logger,
1044                "Found closable contract info. contract_id={} attestations={}",
1045                contract.accepted_contract.get_contract_id_string(),
1046                attestations.len()
1047            );
1048            let offer = &contract.accepted_contract.offered_contract;
1049            let signer = self.signer_provider.derive_contract_signer(offer.keys_id)?;
1050
1051            //  === WARNING ===
1052            // This code could potentially be problematic. When running refund tests, it would look for a CET
1053            // but the CET would be invalid and refund would not pass. By only updating with a valid CET,
1054            // we then go to update. This way if it fails we can check for refund instead of bailing and getting locked
1055            // funds.
1056            if let Ok(cet) = crate::contract_updater::get_signed_cet(
1057                &self.secp,
1058                contract,
1059                contract_info,
1060                adaptor_info,
1061                &attestations,
1062                &signer,
1063                &self.logger,
1064            ) {
1065                log_info!(
1066                    self.logger,
1067                    "Found valid CET. Closing contract. contract_id={}",
1068                    contract.accepted_contract.get_contract_id_string()
1069                );
1070                match self
1071                    .close_contract(
1072                        contract,
1073                        cet,
1074                        attestations.iter().map(|x| x.1.clone()).collect(),
1075                    )
1076                    .await
1077                {
1078                    Ok(closed_contract) => {
1079                        log_info!(
1080                            self.logger,
1081                            "Updated contract to closed. contract_id={}",
1082                            contract.accepted_contract.get_contract_id_string()
1083                        );
1084                        self.store.update_contract(&closed_contract).await?;
1085                        return Ok(());
1086                    }
1087                    Err(e) => {
1088                        log_warn!(
1089                            self.logger,
1090                            "Failed to close contract. contract_id={} error={}",
1091                            contract.accepted_contract.get_contract_id_string(),
1092                            e.to_string()
1093                        );
1094                        return Err(e);
1095                    }
1096                }
1097            }
1098        }
1099
1100        // Check each pending close transaction
1101        for pending_close_tx in &contract
1102            .accepted_contract
1103            .dlc_transactions
1104            .pending_close_txs
1105        {
1106            let confirmations = self
1107                .blockchain
1108                .get_transaction_confirmations(&pending_close_tx.compute_txid())
1109                .await?;
1110
1111            log_debug!(
1112                self.logger,
1113                "Checking pending close transaction. close_txid={} contract_id={} confirmations={}",
1114                pending_close_tx.compute_txid().to_string(),
1115                contract.accepted_contract.get_contract_id_string(),
1116                confirmations
1117            );
1118
1119            if confirmations >= NB_CONFIRMATIONS {
1120                // Found a fully confirmed pending close - move directly to Closed
1121                log_info!(self.logger,
1122                    "Pending close transaction is fully confirmed. Moving to closed. close_txid={} contract_id={}", 
1123                    pending_close_tx.compute_txid().to_string(),
1124                    contract.accepted_contract.get_contract_id_string()
1125                );
1126                let pnl = contract.accepted_contract.compute_pnl(pending_close_tx);
1127                let closed_contract = ClosedContract {
1128                    attestations: None, // Cooperative close has no attestations
1129                    signed_cet: Some(pending_close_tx.clone()), // Cooperative close doesn't use a CET
1130                    contract_id: contract.accepted_contract.get_contract_id(),
1131                    temporary_contract_id: contract.accepted_contract.offered_contract.id,
1132                    counter_party_id: contract.accepted_contract.offered_contract.counter_party,
1133                    pnl,
1134                    funding_txid: contract
1135                        .accepted_contract
1136                        .dlc_transactions
1137                        .fund
1138                        .compute_txid(),
1139                };
1140
1141                self.store
1142                    .update_contract(&Contract::Closed(closed_contract))
1143                    .await?;
1144                break; // Only one close can be confirmed
1145            } else if confirmations >= 1 {
1146                // Found a confirmed but not fully confirmed pending close - move to PreClosed
1147                log_debug!(self.logger,
1148                    "Found a confirmed but not fully confirmed pending close. Moving to preclosed. close_txid={} contract_id={}", 
1149                    pending_close_tx.compute_txid().to_string(),
1150                    contract.accepted_contract.get_contract_id_string()
1151                );
1152                let preclosed_contract = PreClosedContract {
1153                    signed_contract: contract.clone(),
1154                    attestations: None, // Cooperative close has no attestations
1155                    signed_cet: pending_close_tx.clone(),
1156                };
1157
1158                self.store
1159                    .update_contract(&Contract::PreClosed(preclosed_contract))
1160                    .await?;
1161                break; // Only one close can be confirmed
1162            }
1163        }
1164
1165        self.check_refund(contract).await?;
1166
1167        Ok(())
1168    }
1169
1170    /// Manually close a contract with the oracle attestations.
1171    #[tracing::instrument(skip_all, level = "debug")]
1172    pub async fn close_confirmed_contract(
1173        &self,
1174        contract_id: &ContractId,
1175        attestations: Vec<(usize, OracleAttestation)>,
1176    ) -> Result<Contract, Error> {
1177        log_info!(
1178            self.logger,
1179            "Attempting to close confirmed contract manually. contract_id={} outcomes={:?}",
1180            contract_id.to_lower_hex_string(),
1181            attestations
1182                .iter()
1183                .map(|(_, a)| a.outcomes.clone())
1184                .collect::<Vec<_>>()
1185        );
1186        let contract = get_contract_in_state!(self, contract_id, Confirmed, None::<PublicKey>)?;
1187        let contract_infos = &contract.accepted_contract.offered_contract.contract_info;
1188        let adaptor_infos = &contract.accepted_contract.adaptor_infos;
1189
1190        // find the contract info that matches the attestations
1191        if let Some((contract_info, adaptor_info)) =
1192            contract_infos.iter().zip(adaptor_infos).find(|(c, _)| {
1193                let matches = attestations
1194                    .iter()
1195                    .filter(|(i, a)| {
1196                        c.oracle_announcements[*i].oracle_event.oracle_nonces == a.nonces()
1197                    })
1198                    .count();
1199
1200                matches >= c.threshold
1201            })
1202        {
1203            let offer = &contract.accepted_contract.offered_contract;
1204            let signer = self.signer_provider.derive_contract_signer(offer.keys_id)?;
1205            log_debug!(
1206                self.logger,
1207                "Getting signed CET. contract_id={}",
1208                contract.accepted_contract.get_contract_id_string()
1209            );
1210            let cet = crate::contract_updater::get_signed_cet(
1211                &self.secp,
1212                &contract,
1213                contract_info,
1214                adaptor_info,
1215                &attestations,
1216                &signer,
1217                &self.logger,
1218            )?;
1219
1220            // Check that the lock time has passed
1221            let time = bitcoin::absolute::Time::from_consensus(self.time.unix_time_now() as u32)
1222                .expect("Time is not in valid range. This should never happen.");
1223            let height =
1224                Height::from_consensus(self.blockchain.get_blockchain_height().await? as u32)
1225                    .expect("Height is not in valid range. This should never happen.");
1226            let locktime = cet.lock_time;
1227
1228            if !locktime.is_satisfied_by(height, time) {
1229                return Err(Error::InvalidState(
1230                    "CET lock time has not passed yet".to_string(),
1231                ));
1232            }
1233
1234            match self
1235                .close_contract(
1236                    &contract,
1237                    cet,
1238                    attestations.into_iter().map(|x| x.1).collect(),
1239                )
1240                .await
1241            {
1242                Ok(closed_contract) => {
1243                    log_info!(
1244                        self.logger,
1245                        "Closed contract manually. contract_id={}",
1246                        contract.accepted_contract.get_contract_id_string()
1247                    );
1248                    self.store.update_contract(&closed_contract).await?;
1249                    Ok(closed_contract)
1250                }
1251                Err(e) => {
1252                    log_error!(
1253                        self.logger,
1254                        "Failed to close contract. contract_id={} error={}",
1255                        contract.accepted_contract.get_contract_id_string(),
1256                        e.to_string()
1257                    );
1258                    Err(e)
1259                }
1260            }
1261        } else {
1262            Err(Error::InvalidState(
1263                "Attestations did not match contract infos".to_string(),
1264            ))
1265        }
1266    }
1267
1268    #[tracing::instrument(skip_all, level = "debug")]
1269    async fn check_preclosed_contracts(&self) -> Result<(), Error> {
1270        for c in self.store.get_preclosed_contracts().await? {
1271            if let Err(e) = self.check_preclosed_contract(&c).await {
1272                log_error!(
1273                    self.logger,
1274                    "Error checking pre-closed contract. contract_id={} error={}",
1275                    c.signed_contract.accepted_contract.get_contract_id_string(),
1276                    e
1277                )
1278            }
1279        }
1280
1281        Ok(())
1282    }
1283
1284    #[tracing::instrument(skip_all, level = "debug")]
1285    async fn check_preclosed_contract(&self, contract: &PreClosedContract) -> Result<(), Error> {
1286        let broadcasted_txid = contract.signed_cet.compute_txid();
1287        let confirmations = self
1288            .blockchain
1289            .get_transaction_confirmations(&broadcasted_txid)
1290            .await?;
1291        log_debug!(
1292            self.logger,
1293            "Checking pre-closed contract. broadcasted_txid={} contract_id={} confirmations={}",
1294            broadcasted_txid.to_string(),
1295            contract
1296                .signed_contract
1297                .accepted_contract
1298                .get_contract_id_string(),
1299            confirmations
1300        );
1301        if confirmations >= NB_CONFIRMATIONS {
1302            log_debug!(self.logger,
1303                "Pre-closed contract is fully confirmed. Moving to closed. broadcasted_txid={} contract_id={}", 
1304                broadcasted_txid.to_string(),
1305                contract.signed_contract.accepted_contract.get_contract_id_string()
1306            );
1307            let pnl = contract
1308                .signed_contract
1309                .accepted_contract
1310                .compute_pnl(&contract.signed_cet);
1311
1312            let closed_contract = ClosedContract {
1313                attestations: contract.attestations.clone(),
1314                signed_cet: Some(contract.signed_cet.clone()),
1315                contract_id: contract.signed_contract.accepted_contract.get_contract_id(),
1316                temporary_contract_id: contract
1317                    .signed_contract
1318                    .accepted_contract
1319                    .offered_contract
1320                    .id,
1321                counter_party_id: contract
1322                    .signed_contract
1323                    .accepted_contract
1324                    .offered_contract
1325                    .counter_party,
1326                funding_txid: contract
1327                    .signed_contract
1328                    .accepted_contract
1329                    .dlc_transactions
1330                    .fund
1331                    .compute_txid(),
1332                pnl,
1333            };
1334            self.store
1335                .update_contract(&Contract::Closed(closed_contract))
1336                .await?;
1337        }
1338
1339        Ok(())
1340    }
1341
1342    #[tracing::instrument(skip_all, level = "debug")]
1343    async fn close_contract(
1344        &self,
1345        contract: &SignedContract,
1346        signed_cet: Transaction,
1347        attestations: Vec<OracleAttestation>,
1348    ) -> Result<Contract, Error> {
1349        let confirmations = self
1350            .blockchain
1351            .get_transaction_confirmations(&signed_cet.compute_txid())
1352            .await?;
1353
1354        if confirmations < 1 {
1355            log_info!(
1356                self.logger,
1357                "Broadcasting signed CET. txid={} contract_id={}",
1358                signed_cet.compute_txid().to_string(),
1359                contract.accepted_contract.get_contract_id_string()
1360            );
1361            // TODO(tibo): if this fails because another tx is already in
1362            // mempool or blockchain, we might have been cheated. There is
1363            // not much to be done apart from possibly extracting a fraud
1364            // proof but ideally it should be handled.
1365            self.blockchain.send_transaction(&signed_cet).await?;
1366
1367            let preclosed_contract = PreClosedContract {
1368                signed_contract: contract.clone(),
1369                attestations: Some(attestations),
1370                signed_cet,
1371            };
1372
1373            return Ok(Contract::PreClosed(preclosed_contract));
1374        } else if confirmations < NB_CONFIRMATIONS {
1375            log_debug!(self.logger,
1376                "Found a confirmed but not fully confirmed pending close. Moving to preclosed. txid={} contract_id={}", 
1377                signed_cet.compute_txid().to_string(),
1378                contract.accepted_contract.get_contract_id_string()
1379            );
1380            let preclosed_contract = PreClosedContract {
1381                signed_contract: contract.clone(),
1382                attestations: Some(attestations),
1383                signed_cet,
1384            };
1385
1386            return Ok(Contract::PreClosed(preclosed_contract));
1387        }
1388
1389        let closed_contract = ClosedContract {
1390            attestations: Some(attestations.to_vec()),
1391            pnl: contract.accepted_contract.compute_pnl(&signed_cet),
1392            signed_cet: Some(signed_cet),
1393            contract_id: contract.accepted_contract.get_contract_id(),
1394            temporary_contract_id: contract.accepted_contract.offered_contract.id,
1395            funding_txid: contract
1396                .accepted_contract
1397                .dlc_transactions
1398                .fund
1399                .compute_txid(),
1400            counter_party_id: contract.accepted_contract.offered_contract.counter_party,
1401        };
1402
1403        Ok(Contract::Closed(closed_contract))
1404    }
1405
1406    // TODO: Make this public to refund
1407    #[tracing::instrument(skip_all, level = "debug")]
1408    async fn check_refund(&self, contract: &SignedContract) -> Result<(), Error> {
1409        // TODO(tibo): should check for confirmation of refund before updating state
1410        if contract
1411            .accepted_contract
1412            .dlc_transactions
1413            .refund
1414            .lock_time
1415            .to_consensus_u32() as u64
1416            <= self.time.unix_time_now()
1417        {
1418            log_debug!(self.logger,
1419                "Refund locktime has passed. Attempting to broadcast refund. refund_txid={} contract_id={}", 
1420                contract.accepted_contract.dlc_transactions.refund.compute_txid().to_string(),
1421                contract.accepted_contract.get_contract_id_string()
1422            );
1423            let accepted_contract = &contract.accepted_contract;
1424            let refund = accepted_contract.dlc_transactions.refund.clone();
1425            let confirmations = self
1426                .blockchain
1427                .get_transaction_confirmations(&refund.compute_txid())
1428                .await?;
1429            if confirmations == 0 {
1430                log_debug!(self.logger,
1431                    "Refund transaction has not been broadcast yet. Sending transaction txid={} contract_id={}", 
1432                    refund.compute_txid().to_string(),
1433                    contract.accepted_contract.get_contract_id_string()
1434                );
1435                let offer = &contract.accepted_contract.offered_contract;
1436                let signer = self.signer_provider.derive_contract_signer(offer.keys_id)?;
1437                let refund = crate::contract_updater::get_signed_refund(
1438                    &self.secp,
1439                    contract,
1440                    &signer,
1441                    &self.logger,
1442                )?;
1443                self.blockchain.send_transaction(&refund).await?;
1444            }
1445
1446            self.store
1447                .update_contract(&Contract::Refunded(contract.clone()))
1448                .await?;
1449        }
1450
1451        Ok(())
1452    }
1453
1454    /// Function to call when we detect that a contract was closed by our counter party.
1455    /// This will update the state of the contract and return the [`Contract`] object.
1456    #[tracing::instrument(skip_all, level = "debug")]
1457    pub async fn on_counterparty_close(
1458        &mut self,
1459        contract: &SignedContract,
1460        closing_tx: Transaction,
1461        confirmations: u32,
1462    ) -> Result<Contract, Error> {
1463        // check if the closing tx actually spends the funding output
1464        if !closing_tx.input.iter().any(|i| {
1465            i.previous_output
1466                == contract
1467                    .accepted_contract
1468                    .dlc_transactions
1469                    .get_fund_outpoint()
1470        }) {
1471            log_error!(
1472                self.logger,
1473                "Closing tx does not spend the funding tx. txid={} contract_id={}",
1474                closing_tx.compute_txid().to_string(),
1475                contract.accepted_contract.get_contract_id_string()
1476            );
1477            return Err(Error::InvalidParameters(
1478                "Closing tx does not spend the funding tx".to_string(),
1479            ));
1480        }
1481
1482        // check if it is the refund tx (easy case)
1483        if contract
1484            .accepted_contract
1485            .dlc_transactions
1486            .refund
1487            .compute_txid()
1488            == closing_tx.compute_txid()
1489        {
1490            log_debug!(
1491                self.logger,
1492                "Closing tx is the refund tx. Moving to refunded. txid={} contract_id={}",
1493                closing_tx.compute_txid().to_string(),
1494                contract.accepted_contract.get_contract_id_string()
1495            );
1496            let refunded = Contract::Refunded(contract.clone());
1497            self.store.update_contract(&refunded).await?;
1498            return Ok(refunded);
1499        }
1500
1501        let contract = if confirmations < NB_CONFIRMATIONS {
1502            log_info!(self.logger,
1503                "Closing transaction is not fully confirmed. Moving to preclosed. txid={} contract_id={}", 
1504                closing_tx.compute_txid().to_string(),
1505                contract.accepted_contract.get_contract_id_string()
1506            );
1507            Contract::PreClosed(PreClosedContract {
1508                signed_contract: contract.clone(),
1509                attestations: None, // todo in some cases we can get the attestations from the closing tx
1510                signed_cet: closing_tx,
1511            })
1512        } else {
1513            log_info!(
1514                self.logger,
1515                "Closing transaction is fully confirmed. Moving to closed. txid={} contract_id={}",
1516                closing_tx.compute_txid().to_string(),
1517                contract.accepted_contract.get_contract_id_string()
1518            );
1519            Contract::Closed(ClosedContract {
1520                attestations: None, // todo in some cases we can get the attestations from the closing tx
1521                pnl: contract.accepted_contract.compute_pnl(&closing_tx),
1522                signed_cet: Some(closing_tx),
1523                contract_id: contract.accepted_contract.get_contract_id(),
1524                temporary_contract_id: contract.accepted_contract.offered_contract.id,
1525                counter_party_id: contract.accepted_contract.offered_contract.counter_party,
1526                funding_txid: contract
1527                    .accepted_contract
1528                    .dlc_transactions
1529                    .fund
1530                    .compute_txid(),
1531            })
1532        };
1533
1534        self.store.update_contract(&contract).await?;
1535
1536        Ok(contract)
1537    }
1538
1539    /// Initiates a cooperative close of a contract by creating and signing a closing transaction.
1540    /// Returns a CloseDlc message to be sent to the counter party.
1541    /// The contract remains in Confirmed state until the close transaction is broadcast.
1542    #[tracing::instrument(skip_all, level = "debug")]
1543    pub async fn cooperative_close_contract(
1544        &self,
1545        contract_id: &ContractId,
1546        counter_payout: Amount,
1547    ) -> Result<(CloseDlc, PublicKey), Error> {
1548        let signed_contract =
1549            get_contract_in_state!(self, contract_id, Confirmed, None as Option<PublicKey>)?;
1550
1551        let (close_message, close_tx) = crate::contract_updater::create_cooperative_close(
1552            &self.secp,
1553            &signed_contract,
1554            counter_payout,
1555            &self.signer_provider,
1556            &self.logger,
1557        )?;
1558
1559        // Create updated contract with pending close transaction
1560        let mut updated_dlc_transactions =
1561            signed_contract.accepted_contract.dlc_transactions.clone();
1562        updated_dlc_transactions
1563            .pending_close_txs
1564            .push(close_tx.clone());
1565        log_debug!(
1566            self.logger,
1567            "Created updated contract with pending close transaction. close_txid={} contract_id={}",
1568            close_tx.compute_txid().to_string(),
1569            contract_id.to_lower_hex_string()
1570        );
1571
1572        let updated_accepted_contract = AcceptedContract {
1573            dlc_transactions: updated_dlc_transactions,
1574            ..signed_contract.accepted_contract.clone()
1575        };
1576
1577        let updated_signed_contract = SignedContract {
1578            accepted_contract: updated_accepted_contract,
1579            ..signed_contract.clone()
1580        };
1581
1582        // Update contract state to track pending close
1583        self.store
1584            .update_contract(&Contract::Confirmed(updated_signed_contract))
1585            .await?;
1586
1587        let counter_party = signed_contract
1588            .accepted_contract
1589            .offered_contract
1590            .counter_party;
1591
1592        Ok((close_message, counter_party))
1593    }
1594
1595    /// Accepts a cooperative close request by completing the close transaction
1596    /// and broadcasting it to the network.
1597    #[tracing::instrument(skip_all, level = "debug")]
1598    pub async fn accept_cooperative_close(
1599        &self,
1600        contract_id: &ContractId,
1601        close_message: &CloseDlc,
1602    ) -> Result<(), Error> {
1603        let signed_contract =
1604            get_contract_in_state!(self, contract_id, Confirmed, None as Option<PublicKey>)?;
1605
1606        let close_tx = crate::contract_updater::complete_cooperative_close(
1607            &self.secp,
1608            &signed_contract,
1609            close_message,
1610            &self.signer_provider,
1611            &self.logger,
1612        )?;
1613
1614        log_debug!(
1615            self.logger,
1616            "Completed cooperative close. close_txid={} contract_id={}",
1617            close_tx.compute_txid().to_string(),
1618            contract_id.to_lower_hex_string()
1619        );
1620        // Broadcast the closing transaction
1621        self.blockchain.send_transaction(&close_tx).await?;
1622
1623        // Create PreClosed contract (transaction broadcast but not confirmed yet)
1624        let preclosed_contract = PreClosedContract {
1625            signed_contract,
1626            attestations: None,
1627            signed_cet: close_tx,
1628        };
1629
1630        self.store
1631            .update_contract(&Contract::PreClosed(preclosed_contract))
1632            .await?;
1633
1634        Ok(())
1635    }
1636}
1637
1638impl<
1639        W: Deref,
1640        SP: Deref,
1641        B: Deref,
1642        S: Deref,
1643        O: Deref,
1644        T: Deref,
1645        F: Deref,
1646        X: ContractSigner,
1647        L: Deref,
1648    > Manager<W, Arc<CachedContractSignerProvider<SP, X>>, B, S, O, T, F, X, L>
1649where
1650    W::Target: Wallet,
1651    SP::Target: ContractSignerProvider<Signer = X>,
1652    B::Target: Blockchain,
1653    S::Target: Storage,
1654    O::Target: Oracle,
1655    T::Target: Time,
1656    F::Target: FeeEstimator,
1657    L::Target: Logger,
1658{
1659    /// Create a new channel offer and return the [`dlc_messages::channel::OfferChannel`]
1660    /// message to be sent to the `counter_party`.
1661    pub async fn offer_channel(
1662        &self,
1663        contract_input: &ContractInput,
1664        counter_party: PublicKey,
1665    ) -> Result<OfferChannel, Error> {
1666        let oracle_announcements = self.oracle_announcements(contract_input).await?;
1667
1668        let (offered_channel, offered_contract) = crate::channel_updater::offer_channel(
1669            &self.secp,
1670            contract_input,
1671            &counter_party,
1672            &oracle_announcements,
1673            CET_NSEQUENCE,
1674            REFUND_DELAY,
1675            &self.wallet,
1676            &self.signer_provider,
1677            &self.blockchain,
1678            &self.time,
1679            crate::utils::get_new_temporary_id(),
1680        )
1681        .await?;
1682
1683        let msg = offered_channel.get_offer_channel_msg(&offered_contract);
1684
1685        self.store
1686            .upsert_channel(
1687                Channel::Offered(offered_channel),
1688                Some(Contract::Offered(offered_contract)),
1689            )
1690            .await?;
1691
1692        Ok(msg)
1693    }
1694
1695    /// Reject a channel that was offered. Returns the [`dlc_messages::channel::Reject`]
1696    /// message to be sent as well as the public key of the offering node.
1697    pub async fn reject_channel(
1698        &self,
1699        channel_id: &ChannelId,
1700    ) -> Result<(Reject, PublicKey), Error> {
1701        let offered_channel =
1702            get_channel_in_state!(self, channel_id, Offered, None as Option<PublicKey>)?;
1703
1704        if offered_channel.is_offer_party {
1705            return Err(Error::InvalidState(
1706                "Cannot reject channel initiated by us.".to_string(),
1707            ));
1708        }
1709
1710        let offered_contract = get_contract_in_state!(
1711            self,
1712            &offered_channel.offered_contract_id,
1713            Offered,
1714            None as Option<PublicKey>
1715        )?;
1716
1717        let counterparty = offered_channel.counter_party;
1718        self.store
1719            .upsert_channel(
1720                Channel::Cancelled(offered_channel),
1721                Some(Contract::Rejected(offered_contract)),
1722            )
1723            .await?;
1724
1725        let msg = Reject {
1726            channel_id: *channel_id,
1727        };
1728        Ok((msg, counterparty))
1729    }
1730
1731    /// Accept a channel that was offered. Returns the [`dlc_messages::channel::AcceptChannel`]
1732    /// message to be sent, the updated [`crate::ChannelId`] and [`crate::ContractId`],
1733    /// as well as the public key of the offering node.
1734    pub async fn accept_channel(
1735        &self,
1736        channel_id: &ChannelId,
1737    ) -> Result<(AcceptChannel, ChannelId, ContractId, PublicKey), Error> {
1738        let offered_channel =
1739            get_channel_in_state!(self, channel_id, Offered, None as Option<PublicKey>)?;
1740
1741        if offered_channel.is_offer_party {
1742            return Err(Error::InvalidState(
1743                "Cannot accept channel initiated by us.".to_string(),
1744            ));
1745        }
1746
1747        let offered_contract = get_contract_in_state!(
1748            self,
1749            &offered_channel.offered_contract_id,
1750            Offered,
1751            None as Option<PublicKey>
1752        )?;
1753
1754        let (accepted_channel, accepted_contract, accept_channel) =
1755            crate::channel_updater::accept_channel_offer(
1756                &self.secp,
1757                &offered_channel,
1758                &offered_contract,
1759                &self.wallet,
1760                &self.signer_provider,
1761                &self.blockchain,
1762            )
1763            .await?;
1764
1765        self.wallet.import_address(&Address::p2wsh(
1766            &accepted_contract.dlc_transactions.funding_script_pubkey,
1767            self.blockchain.get_network()?,
1768        ))?;
1769
1770        let channel_id = accepted_channel.channel_id;
1771        let contract_id = accepted_contract.get_contract_id();
1772        let counter_party = accepted_contract.offered_contract.counter_party;
1773
1774        self.store
1775            .upsert_channel(
1776                Channel::Accepted(accepted_channel),
1777                Some(Contract::Accepted(accepted_contract)),
1778            )
1779            .await?;
1780
1781        Ok((accept_channel, channel_id, contract_id, counter_party))
1782    }
1783
1784    /// Force close the channel with given [`crate::ChannelId`].
1785    pub async fn force_close_channel(&self, channel_id: &ChannelId) -> Result<(), Error> {
1786        let channel = get_channel_in_state!(self, channel_id, Signed, None as Option<PublicKey>)?;
1787
1788        self.force_close_channel_internal(channel, true).await
1789    }
1790
1791    /// Offer to settle the balance of a channel so that the counter party gets
1792    /// `counter_payout`. Returns the [`dlc_messages::channel::SettleChannelOffer`]
1793    /// message to be sent and the public key of the counter party node.
1794    pub async fn settle_offer(
1795        &self,
1796        channel_id: &ChannelId,
1797        counter_payout: Amount,
1798    ) -> Result<(SettleOffer, PublicKey), Error> {
1799        let mut signed_channel =
1800            get_channel_in_state!(self, channel_id, Signed, None as Option<PublicKey>)?;
1801
1802        let msg = crate::channel_updater::settle_channel_offer(
1803            &self.secp,
1804            &mut signed_channel,
1805            counter_payout,
1806            PEER_TIMEOUT,
1807            &self.signer_provider,
1808            &self.time,
1809        )?;
1810
1811        let counter_party = signed_channel.counter_party;
1812
1813        self.store
1814            .upsert_channel(Channel::Signed(signed_channel), None)
1815            .await?;
1816
1817        Ok((msg, counter_party))
1818    }
1819
1820    /// Accept a settlement offer, returning the [`SettleAccept`] message to be
1821    /// sent to the node with the returned [`PublicKey`] id.
1822    pub async fn accept_settle_offer(
1823        &self,
1824        channel_id: &ChannelId,
1825    ) -> Result<(SettleAccept, PublicKey), Error> {
1826        let mut signed_channel =
1827            get_channel_in_state!(self, channel_id, Signed, None as Option<PublicKey>)?;
1828
1829        let msg = crate::channel_updater::settle_channel_accept(
1830            &self.secp,
1831            &mut signed_channel,
1832            CET_NSEQUENCE,
1833            0,
1834            PEER_TIMEOUT,
1835            &self.signer_provider,
1836            &self.time,
1837            &self.chain_monitor,
1838        )
1839        .await?;
1840
1841        let counter_party = signed_channel.counter_party;
1842
1843        self.store
1844            .upsert_channel(Channel::Signed(signed_channel), None)
1845            .await?;
1846
1847        Ok((msg, counter_party))
1848    }
1849
1850    /// Returns a [`RenewOffer`] message as well as the [`PublicKey`] of the
1851    /// counter party's node to offer the establishment of a new contract in the
1852    /// channel.
1853    pub async fn renew_offer(
1854        &self,
1855        channel_id: &ChannelId,
1856        counter_payout: Amount,
1857        contract_input: &ContractInput,
1858    ) -> Result<(RenewOffer, PublicKey), Error> {
1859        let mut signed_channel =
1860            get_channel_in_state!(self, channel_id, Signed, None as Option<PublicKey>)?;
1861
1862        let oracle_announcements = self.oracle_announcements(contract_input).await?;
1863
1864        let (msg, offered_contract) = crate::channel_updater::renew_offer(
1865            &self.secp,
1866            &mut signed_channel,
1867            contract_input,
1868            oracle_announcements,
1869            counter_payout,
1870            REFUND_DELAY,
1871            PEER_TIMEOUT,
1872            CET_NSEQUENCE,
1873            &self.signer_provider,
1874            &self.time,
1875        )?;
1876
1877        let counter_party = offered_contract.counter_party;
1878
1879        self.store
1880            .upsert_channel(
1881                Channel::Signed(signed_channel),
1882                Some(Contract::Offered(offered_contract)),
1883            )
1884            .await?;
1885
1886        Ok((msg, counter_party))
1887    }
1888
1889    /// Accept an offer to renew the contract in the channel. Returns the
1890    /// [`RenewAccept`] message to be sent to the peer with the returned
1891    /// [`PublicKey`] as node id.
1892    pub async fn accept_renew_offer(
1893        &self,
1894        channel_id: &ChannelId,
1895    ) -> Result<(RenewAccept, PublicKey), Error> {
1896        let mut signed_channel =
1897            get_channel_in_state!(self, channel_id, Signed, None as Option<PublicKey>)?;
1898        let offered_contract_id = signed_channel.get_contract_id().ok_or_else(|| {
1899            Error::InvalidState("Expected to have a contract id but did not.".to_string())
1900        })?;
1901
1902        let offered_contract = get_contract_in_state!(
1903            self,
1904            &offered_contract_id,
1905            Offered,
1906            None as Option<PublicKey>
1907        )?;
1908
1909        let (accepted_contract, msg) = crate::channel_updater::accept_channel_renewal(
1910            &self.secp,
1911            &mut signed_channel,
1912            &offered_contract,
1913            CET_NSEQUENCE,
1914            PEER_TIMEOUT,
1915            &self.signer_provider,
1916            &self.time,
1917        )?;
1918
1919        let counter_party = signed_channel.counter_party;
1920
1921        self.store
1922            .upsert_channel(
1923                Channel::Signed(signed_channel),
1924                Some(Contract::Accepted(accepted_contract)),
1925            )
1926            .await?;
1927
1928        Ok((msg, counter_party))
1929    }
1930
1931    /// Reject an offer to renew the contract in the channel. Returns the
1932    /// [`Reject`] message to be sent to the peer with the returned
1933    /// [`PublicKey`] node id.
1934    pub async fn reject_renew_offer(
1935        &self,
1936        channel_id: &ChannelId,
1937    ) -> Result<(Reject, PublicKey), Error> {
1938        let mut signed_channel =
1939            get_channel_in_state!(self, channel_id, Signed, None as Option<PublicKey>)?;
1940        let offered_contract_id = signed_channel.get_contract_id().ok_or_else(|| {
1941            Error::InvalidState(
1942                "Expected to be in a state with an associated contract id but was not.".to_string(),
1943            )
1944        })?;
1945
1946        let offered_contract = get_contract_in_state!(
1947            self,
1948            &offered_contract_id,
1949            Offered,
1950            None as Option<PublicKey>
1951        )?;
1952
1953        let reject_msg = crate::channel_updater::reject_renew_offer(&mut signed_channel)?;
1954
1955        let counter_party = signed_channel.counter_party;
1956
1957        self.store
1958            .upsert_channel(
1959                Channel::Signed(signed_channel),
1960                Some(Contract::Rejected(offered_contract)),
1961            )
1962            .await?;
1963
1964        Ok((reject_msg, counter_party))
1965    }
1966
1967    /// Returns a [`Reject`] message to be sent to the counter party of the
1968    /// channel to inform them that the local party does not wish to accept the
1969    /// proposed settle offer.
1970    pub async fn reject_settle_offer(
1971        &self,
1972        channel_id: &ChannelId,
1973    ) -> Result<(Reject, PublicKey), Error> {
1974        let mut signed_channel =
1975            get_channel_in_state!(self, channel_id, Signed, None as Option<PublicKey>)?;
1976
1977        let msg = crate::channel_updater::reject_settle_offer(&mut signed_channel)?;
1978
1979        let counter_party = signed_channel.counter_party;
1980
1981        self.store
1982            .upsert_channel(Channel::Signed(signed_channel), None)
1983            .await?;
1984
1985        Ok((msg, counter_party))
1986    }
1987
1988    /// Returns a [`CollaborativeCloseOffer`] message to be sent to the counter
1989    /// party of the channel and update the state of the channel. Note that the
1990    /// channel will be forced closed after a timeout if the counter party does
1991    /// not broadcast the close transaction.
1992    pub async fn offer_collaborative_close(
1993        &self,
1994        channel_id: &ChannelId,
1995        counter_payout: Amount,
1996        additional_inputs: Vec<OutPoint>,
1997    ) -> Result<CollaborativeCloseOffer, Error> {
1998        let mut signed_channel =
1999            get_channel_in_state!(self, channel_id, Signed, None as Option<PublicKey>)?;
2000
2001        let (msg, close_tx) = crate::channel_updater::offer_collaborative_close(
2002            &self.secp,
2003            &mut signed_channel,
2004            counter_payout,
2005            additional_inputs,
2006            &self.signer_provider,
2007            &self.time,
2008        )?;
2009
2010        self.chain_monitor.lock().await.add_tx(
2011            close_tx.compute_txid(),
2012            ChannelInfo {
2013                channel_id: *channel_id,
2014                tx_type: TxType::CollaborativeClose,
2015            },
2016        );
2017
2018        self.store
2019            .upsert_channel(Channel::Signed(signed_channel), None)
2020            .await?;
2021        self.store
2022            .persist_chain_monitor(&*self.chain_monitor.lock().await)
2023            .await?;
2024
2025        Ok(msg)
2026    }
2027
2028    /// Accept an offer to collaboratively close the channel. The close transaction
2029    /// will be broadcast and the state of the channel updated.
2030    pub async fn accept_collaborative_close(&self, channel_id: &ChannelId) -> Result<(), Error> {
2031        let signed_channel =
2032            get_channel_in_state!(self, channel_id, Signed, None as Option<PublicKey>)?;
2033
2034        let closed_contract = if let Some(SignedChannelState::Established {
2035            signed_contract_id,
2036            ..
2037        }) = &signed_channel.roll_back_state
2038        {
2039            let counter_payout = get_signed_channel_state!(
2040                signed_channel,
2041                CollaborativeCloseOffered,
2042                counter_payout
2043            )?;
2044            Some(
2045                self.get_collaboratively_closed_contract(signed_contract_id, *counter_payout, true)
2046                    .await?,
2047            )
2048        } else {
2049            None
2050        };
2051
2052        let (close_tx, closed_channel) = crate::channel_updater::accept_collaborative_close_offer(
2053            &self.secp,
2054            &signed_channel,
2055            &self.signer_provider,
2056        )?;
2057
2058        self.blockchain.send_transaction(&close_tx).await?;
2059
2060        self.store.upsert_channel(closed_channel, None).await?;
2061
2062        if let Some(closed_contract) = closed_contract {
2063            self.store
2064                .update_contract(&Contract::Closed(closed_contract))
2065                .await?;
2066        }
2067
2068        Ok(())
2069    }
2070
2071    async fn try_finalize_closing_established_channel(
2072        &self,
2073        signed_channel: SignedChannel,
2074    ) -> Result<(), Error> {
2075        let (buffer_tx, contract_id, &is_initiator) = get_signed_channel_state!(
2076            signed_channel,
2077            Closing,
2078            buffer_transaction,
2079            contract_id,
2080            is_initiator
2081        )?;
2082
2083        if self
2084            .blockchain
2085            .get_transaction_confirmations(&buffer_tx.compute_txid())
2086            .await?
2087            >= CET_NSEQUENCE
2088        {
2089            log_info!(
2090                self.logger,
2091                "Buffer transaction for contract {} has enough confirmations to spend from it",
2092                serialize_hex(&contract_id)
2093            );
2094
2095            let confirmed_contract =
2096                get_contract_in_state!(self, &contract_id, Confirmed, None as Option<PublicKey>)?;
2097
2098            let (contract_info, adaptor_info, attestations) = self
2099                .get_closable_contract_info(&confirmed_contract)
2100                .await
2101                .ok_or_else(|| {
2102                    Error::InvalidState("Could not get information to close contract".to_string())
2103                })?;
2104
2105            let (signed_cet, closed_channel) =
2106                crate::channel_updater::finalize_unilateral_close_settled_channel(
2107                    &self.secp,
2108                    &signed_channel,
2109                    &confirmed_contract,
2110                    contract_info,
2111                    &attestations,
2112                    adaptor_info,
2113                    &self.signer_provider,
2114                    is_initiator,
2115                )?;
2116
2117            let closed_contract = self
2118                .close_contract(
2119                    &confirmed_contract,
2120                    signed_cet,
2121                    attestations.iter().map(|x| &x.1).cloned().collect(),
2122                )
2123                .await?;
2124
2125            self.chain_monitor
2126                .lock()
2127                .await
2128                .cleanup_channel(signed_channel.channel_id);
2129
2130            self.store
2131                .upsert_channel(closed_channel, Some(closed_contract))
2132                .await?;
2133        }
2134
2135        Ok(())
2136    }
2137
2138    async fn on_offer_channel(
2139        &self,
2140        offer_channel: &OfferChannel,
2141        counter_party: PublicKey,
2142    ) -> Result<(), Error> {
2143        offer_channel.validate(
2144            &self.secp,
2145            REFUND_DELAY,
2146            REFUND_DELAY * 2,
2147            CET_NSEQUENCE,
2148            CET_NSEQUENCE * 2,
2149        )?;
2150
2151        let keys_id = self
2152            .signer_provider
2153            .derive_signer_key_id(false, offer_channel.temporary_contract_id);
2154        let (channel, contract) =
2155            OfferedChannel::from_offer_channel(offer_channel, counter_party, keys_id)?;
2156
2157        contract.validate()?;
2158
2159        if self
2160            .store
2161            .get_channel(&channel.temporary_channel_id)
2162            .await?
2163            .is_some()
2164        {
2165            return Err(Error::InvalidParameters(
2166                "Channel with identical idea already in store".to_string(),
2167            ));
2168        }
2169
2170        self.store
2171            .upsert_channel(Channel::Offered(channel), Some(Contract::Offered(contract)))
2172            .await?;
2173
2174        Ok(())
2175    }
2176
2177    async fn on_accept_channel(
2178        &self,
2179        accept_channel: &AcceptChannel,
2180        peer_id: &PublicKey,
2181    ) -> Result<SignChannel, Error> {
2182        let offered_channel = get_channel_in_state!(
2183            self,
2184            &accept_channel.temporary_channel_id,
2185            Offered,
2186            Some(*peer_id)
2187        )?;
2188        let offered_contract = get_contract_in_state!(
2189            self,
2190            &offered_channel.offered_contract_id,
2191            Offered,
2192            Some(*peer_id)
2193        )?;
2194
2195        let (signed_channel, signed_contract, sign_channel) = {
2196            let res = crate::channel_updater::verify_and_sign_accepted_channel(
2197                &self.secp,
2198                &offered_channel,
2199                &offered_contract,
2200                accept_channel,
2201                //TODO(tibo): this should be parameterizable.
2202                CET_NSEQUENCE,
2203                &self.wallet,
2204                &self.signer_provider,
2205                &self.store,
2206                &self.chain_monitor,
2207                &self.logger,
2208            )
2209            .await;
2210
2211            match res {
2212                Ok(res) => res,
2213                Err(e) => {
2214                    let channel = crate::channel::FailedAccept {
2215                        temporary_channel_id: accept_channel.temporary_channel_id,
2216                        error_message: format!("Error validating accept channel: {e}"),
2217                        accept_message: accept_channel.clone(),
2218                        counter_party: *peer_id,
2219                    };
2220                    self.store
2221                        .upsert_channel(Channel::FailedAccept(channel), None)
2222                        .await?;
2223                    return Err(e);
2224                }
2225            }
2226        };
2227
2228        self.wallet.import_address(&Address::p2wsh(
2229            &signed_contract
2230                .accepted_contract
2231                .dlc_transactions
2232                .funding_script_pubkey,
2233            self.blockchain.get_network()?,
2234        ))?;
2235
2236        if let SignedChannelState::Established {
2237            buffer_transaction, ..
2238        } = &signed_channel.state
2239        {
2240            self.chain_monitor.lock().await.add_tx(
2241                buffer_transaction.compute_txid(),
2242                ChannelInfo {
2243                    channel_id: signed_channel.channel_id,
2244                    tx_type: TxType::BufferTx,
2245                },
2246            );
2247        } else {
2248            unreachable!();
2249        }
2250
2251        self.store
2252            .upsert_channel(
2253                Channel::Signed(signed_channel),
2254                Some(Contract::Signed(signed_contract)),
2255            )
2256            .await?;
2257
2258        self.store
2259            .persist_chain_monitor(&*self.chain_monitor.lock().await)
2260            .await?;
2261
2262        Ok(sign_channel)
2263    }
2264
2265    async fn on_sign_channel(
2266        &self,
2267        sign_channel: &SignChannel,
2268        peer_id: &PublicKey,
2269    ) -> Result<(), Error> {
2270        let accepted_channel =
2271            get_channel_in_state!(self, &sign_channel.channel_id, Accepted, Some(*peer_id))?;
2272        let accepted_contract = get_contract_in_state!(
2273            self,
2274            &accepted_channel.accepted_contract_id,
2275            Accepted,
2276            Some(*peer_id)
2277        )?;
2278
2279        let (signed_channel, signed_contract, signed_fund_tx) = {
2280            let res = verify_signed_channel(
2281                &self.secp,
2282                &accepted_channel,
2283                &accepted_contract,
2284                sign_channel,
2285                &self.wallet,
2286                &self.chain_monitor,
2287                &self.store,
2288                &self.signer_provider,
2289                &self.logger,
2290            )
2291            .await;
2292
2293            match res {
2294                Ok(res) => res,
2295                Err(e) => {
2296                    let channel = crate::channel::FailedSign {
2297                        channel_id: sign_channel.channel_id,
2298                        error_message: format!("Error validating accept channel: {e}"),
2299                        sign_message: sign_channel.clone(),
2300                        counter_party: *peer_id,
2301                    };
2302                    self.store
2303                        .upsert_channel(Channel::FailedSign(channel), None)
2304                        .await?;
2305                    return Err(e);
2306                }
2307            }
2308        };
2309
2310        if let SignedChannelState::Established {
2311            buffer_transaction, ..
2312        } = &signed_channel.state
2313        {
2314            self.chain_monitor.lock().await.add_tx(
2315                buffer_transaction.compute_txid(),
2316                ChannelInfo {
2317                    channel_id: signed_channel.channel_id,
2318                    tx_type: TxType::BufferTx,
2319                },
2320            );
2321        } else {
2322            unreachable!();
2323        }
2324
2325        self.blockchain.send_transaction(&signed_fund_tx).await?;
2326
2327        self.store
2328            .upsert_channel(
2329                Channel::Signed(signed_channel),
2330                Some(Contract::Signed(signed_contract)),
2331            )
2332            .await?;
2333        self.store
2334            .persist_chain_monitor(&*self.chain_monitor.lock().await)
2335            .await?;
2336
2337        Ok(())
2338    }
2339
2340    async fn on_settle_offer(
2341        &self,
2342        settle_offer: &SettleOffer,
2343        peer_id: &PublicKey,
2344    ) -> Result<Option<Reject>, Error> {
2345        let mut signed_channel =
2346            get_channel_in_state!(self, &settle_offer.channel_id, Signed, Some(*peer_id))?;
2347
2348        if let SignedChannelState::SettledOffered { .. } = signed_channel.state {
2349            return Ok(Some(Reject {
2350                channel_id: settle_offer.channel_id,
2351            }));
2352        }
2353
2354        crate::channel_updater::on_settle_offer(&mut signed_channel, settle_offer)?;
2355
2356        self.store
2357            .upsert_channel(Channel::Signed(signed_channel), None)
2358            .await?;
2359
2360        Ok(None)
2361    }
2362
2363    async fn on_settle_accept(
2364        &self,
2365        settle_accept: &SettleAccept,
2366        peer_id: &PublicKey,
2367    ) -> Result<SettleConfirm, Error> {
2368        let mut signed_channel =
2369            get_channel_in_state!(self, &settle_accept.channel_id, Signed, Some(*peer_id))?;
2370
2371        let msg = crate::channel_updater::settle_channel_confirm(
2372            &self.secp,
2373            &mut signed_channel,
2374            settle_accept,
2375            CET_NSEQUENCE,
2376            0,
2377            PEER_TIMEOUT,
2378            &self.signer_provider,
2379            &self.time,
2380            &self.chain_monitor,
2381        )
2382        .await?;
2383
2384        self.store
2385            .upsert_channel(Channel::Signed(signed_channel), None)
2386            .await?;
2387
2388        Ok(msg)
2389    }
2390
2391    async fn on_settle_confirm(
2392        &self,
2393        settle_confirm: &SettleConfirm,
2394        peer_id: &PublicKey,
2395    ) -> Result<SettleFinalize, Error> {
2396        let mut signed_channel =
2397            get_channel_in_state!(self, &settle_confirm.channel_id, Signed, Some(*peer_id))?;
2398        let &own_payout = get_signed_channel_state!(signed_channel, SettledAccepted, own_payout)?;
2399        let (prev_buffer_tx, own_buffer_adaptor_signature, is_offer, signed_contract_id) = get_signed_channel_rollback_state!(
2400            signed_channel,
2401            Established,
2402            buffer_transaction,
2403            own_buffer_adaptor_signature,
2404            is_offer,
2405            signed_contract_id
2406        )?;
2407
2408        let prev_buffer_txid = prev_buffer_tx.compute_txid();
2409        let own_buffer_adaptor_signature = *own_buffer_adaptor_signature;
2410        let is_offer = *is_offer;
2411        let signed_contract_id = *signed_contract_id;
2412
2413        let msg = crate::channel_updater::settle_channel_finalize(
2414            &self.secp,
2415            &mut signed_channel,
2416            settle_confirm,
2417            &self.signer_provider,
2418        )?;
2419
2420        self.chain_monitor.lock().await.add_tx(
2421            prev_buffer_txid,
2422            ChannelInfo {
2423                channel_id: signed_channel.channel_id,
2424                tx_type: TxType::Revoked {
2425                    update_idx: signed_channel.update_idx + 1,
2426                    own_adaptor_signature: own_buffer_adaptor_signature,
2427                    is_offer,
2428                    revoked_tx_type: RevokedTxType::Buffer,
2429                },
2430            },
2431        );
2432
2433        let closed_contract = Contract::Closed(
2434            self.get_collaboratively_closed_contract(&signed_contract_id, own_payout, true)
2435                .await?,
2436        );
2437
2438        self.store
2439            .upsert_channel(Channel::Signed(signed_channel), Some(closed_contract))
2440            .await?;
2441        self.store
2442            .persist_chain_monitor(&*self.chain_monitor.lock().await)
2443            .await?;
2444
2445        Ok(msg)
2446    }
2447
2448    async fn on_settle_finalize(
2449        &self,
2450        settle_finalize: &SettleFinalize,
2451        peer_id: &PublicKey,
2452    ) -> Result<(), Error> {
2453        let mut signed_channel =
2454            get_channel_in_state!(self, &settle_finalize.channel_id, Signed, Some(*peer_id))?;
2455        let &own_payout = get_signed_channel_state!(signed_channel, SettledConfirmed, own_payout)?;
2456        let (buffer_tx, own_buffer_adaptor_signature, is_offer, signed_contract_id) = get_signed_channel_rollback_state!(
2457            signed_channel,
2458            Established,
2459            buffer_transaction,
2460            own_buffer_adaptor_signature,
2461            is_offer,
2462            signed_contract_id
2463        )?;
2464
2465        let own_buffer_adaptor_signature = *own_buffer_adaptor_signature;
2466        let is_offer = *is_offer;
2467        let buffer_txid = buffer_tx.compute_txid();
2468        let signed_contract_id = *signed_contract_id;
2469
2470        crate::channel_updater::settle_channel_on_finalize(
2471            &self.secp,
2472            &mut signed_channel,
2473            settle_finalize,
2474        )?;
2475
2476        self.chain_monitor.lock().await.add_tx(
2477            buffer_txid,
2478            ChannelInfo {
2479                channel_id: signed_channel.channel_id,
2480                tx_type: TxType::Revoked {
2481                    update_idx: signed_channel.update_idx + 1,
2482                    own_adaptor_signature: own_buffer_adaptor_signature,
2483                    is_offer,
2484                    revoked_tx_type: RevokedTxType::Buffer,
2485                },
2486            },
2487        );
2488
2489        let closed_contract = Contract::Closed(
2490            self.get_collaboratively_closed_contract(&signed_contract_id, own_payout, true)
2491                .await?,
2492        );
2493        self.store
2494            .upsert_channel(Channel::Signed(signed_channel), Some(closed_contract))
2495            .await?;
2496        self.store
2497            .persist_chain_monitor(&*self.chain_monitor.lock().await)
2498            .await?;
2499
2500        Ok(())
2501    }
2502
2503    async fn on_renew_offer(
2504        &self,
2505        renew_offer: &RenewOffer,
2506        peer_id: &PublicKey,
2507    ) -> Result<Option<Reject>, Error> {
2508        let mut signed_channel =
2509            get_channel_in_state!(self, &renew_offer.channel_id, Signed, Some(*peer_id))?;
2510
2511        // Received a renew offer when we already sent one, we reject it.
2512        if let SignedChannelState::RenewOffered { is_offer, .. } = signed_channel.state {
2513            if is_offer {
2514                return Ok(Some(Reject {
2515                    channel_id: renew_offer.channel_id,
2516                }));
2517            }
2518        }
2519
2520        let offered_contract = crate::channel_updater::on_renew_offer(
2521            &mut signed_channel,
2522            renew_offer,
2523            PEER_TIMEOUT,
2524            &self.time,
2525        )?;
2526
2527        self.store.create_contract(&offered_contract).await?;
2528        self.store
2529            .upsert_channel(Channel::Signed(signed_channel), None)
2530            .await?;
2531
2532        Ok(None)
2533    }
2534
2535    async fn on_renew_accept(
2536        &self,
2537        renew_accept: &RenewAccept,
2538        peer_id: &PublicKey,
2539    ) -> Result<RenewConfirm, Error> {
2540        let mut signed_channel =
2541            get_channel_in_state!(self, &renew_accept.channel_id, Signed, Some(*peer_id))?;
2542        let offered_contract_id = signed_channel.get_contract_id().ok_or_else(|| {
2543            Error::InvalidState(
2544                "Expected to be in a state with an associated contract id but was not.".to_string(),
2545            )
2546        })?;
2547
2548        let offered_contract =
2549            get_contract_in_state!(self, &offered_contract_id, Offered, Some(*peer_id))?;
2550
2551        let (signed_contract, msg) = crate::channel_updater::verify_renew_accept_and_confirm(
2552            &self.secp,
2553            renew_accept,
2554            &mut signed_channel,
2555            &offered_contract,
2556            CET_NSEQUENCE,
2557            PEER_TIMEOUT,
2558            &self.wallet,
2559            &self.signer_provider,
2560            &self.time,
2561            &self.store,
2562            &self.logger,
2563        )
2564        .await?;
2565
2566        // Directly confirmed as we're in a channel the fund tx is already confirmed.
2567        self.store
2568            .upsert_channel(
2569                Channel::Signed(signed_channel),
2570                Some(Contract::Confirmed(signed_contract)),
2571            )
2572            .await?;
2573
2574        Ok(msg)
2575    }
2576
2577    async fn on_renew_confirm(
2578        &self,
2579        renew_confirm: &RenewConfirm,
2580        peer_id: &PublicKey,
2581    ) -> Result<RenewFinalize, Error> {
2582        let mut signed_channel =
2583            get_channel_in_state!(self, &renew_confirm.channel_id, Signed, Some(*peer_id))?;
2584        let own_payout = get_signed_channel_state!(signed_channel, RenewAccepted, own_payout)?;
2585        let contract_id = signed_channel.get_contract_id().ok_or_else(|| {
2586            Error::InvalidState(
2587                "Expected to be in a state with an associated contract id but was not.".to_string(),
2588            )
2589        })?;
2590
2591        let (tx_type, prev_tx_id, closed_contract) = match signed_channel
2592            .roll_back_state
2593            .as_ref()
2594            .expect("to have a rollback state")
2595        {
2596            SignedChannelState::Established {
2597                own_buffer_adaptor_signature,
2598                buffer_transaction,
2599                signed_contract_id,
2600                ..
2601            } => {
2602                let closed_contract = Contract::Closed(
2603                    self.get_collaboratively_closed_contract(signed_contract_id, *own_payout, true)
2604                        .await?,
2605                );
2606                (
2607                    TxType::Revoked {
2608                        update_idx: signed_channel.update_idx,
2609                        own_adaptor_signature: *own_buffer_adaptor_signature,
2610                        is_offer: false,
2611                        revoked_tx_type: RevokedTxType::Buffer,
2612                    },
2613                    buffer_transaction.compute_txid(),
2614                    Some(closed_contract),
2615                )
2616            }
2617            SignedChannelState::Settled {
2618                settle_tx,
2619                own_settle_adaptor_signature,
2620                ..
2621            } => (
2622                TxType::Revoked {
2623                    update_idx: signed_channel.update_idx,
2624                    own_adaptor_signature: *own_settle_adaptor_signature,
2625                    is_offer: false,
2626                    revoked_tx_type: RevokedTxType::Settle,
2627                },
2628                settle_tx.compute_txid(),
2629                None,
2630            ),
2631            s => {
2632                return Err(Error::InvalidState(format!(
2633                    "Expected rollback state Established or Revoked but found {s:?}"
2634                )))
2635            }
2636        };
2637        let accepted_contract =
2638            get_contract_in_state!(self, &contract_id, Accepted, Some(*peer_id))?;
2639
2640        let (signed_contract, msg) = crate::channel_updater::verify_renew_confirm_and_finalize(
2641            &self.secp,
2642            &mut signed_channel,
2643            &accepted_contract,
2644            renew_confirm,
2645            PEER_TIMEOUT,
2646            &self.time,
2647            &self.wallet,
2648            &self.signer_provider,
2649            &self.chain_monitor,
2650            &self.store,
2651            &self.logger,
2652        )
2653        .await?;
2654
2655        self.chain_monitor.lock().await.add_tx(
2656            prev_tx_id,
2657            ChannelInfo {
2658                channel_id: signed_channel.channel_id,
2659                tx_type,
2660            },
2661        );
2662
2663        // Directly confirmed as we're in a channel the fund tx is already confirmed.
2664        self.store
2665            .upsert_channel(
2666                Channel::Signed(signed_channel),
2667                Some(Contract::Confirmed(signed_contract)),
2668            )
2669            .await?;
2670
2671        self.store
2672            .persist_chain_monitor(&*self.chain_monitor.lock().await)
2673            .await?;
2674
2675        if let Some(closed_contract) = closed_contract {
2676            self.store.update_contract(&closed_contract).await?;
2677        }
2678
2679        Ok(msg)
2680    }
2681
2682    async fn on_renew_finalize(
2683        &self,
2684        renew_finalize: &RenewFinalize,
2685        peer_id: &PublicKey,
2686    ) -> Result<RenewRevoke, Error> {
2687        let mut signed_channel =
2688            get_channel_in_state!(self, &renew_finalize.channel_id, Signed, Some(*peer_id))?;
2689        let own_payout = get_signed_channel_state!(signed_channel, RenewConfirmed, own_payout)?;
2690
2691        let (tx_type, prev_tx_id, closed_contract) = match signed_channel
2692            .roll_back_state
2693            .as_ref()
2694            .expect("to have a rollback state")
2695        {
2696            SignedChannelState::Established {
2697                own_buffer_adaptor_signature,
2698                buffer_transaction,
2699                signed_contract_id,
2700                ..
2701            } => {
2702                let closed_contract = self
2703                    .get_collaboratively_closed_contract(signed_contract_id, *own_payout, true)
2704                    .await?;
2705                (
2706                    TxType::Revoked {
2707                        update_idx: signed_channel.update_idx,
2708                        own_adaptor_signature: *own_buffer_adaptor_signature,
2709                        is_offer: false,
2710                        revoked_tx_type: RevokedTxType::Buffer,
2711                    },
2712                    buffer_transaction.compute_txid(),
2713                    Some(Contract::Closed(closed_contract)),
2714                )
2715            }
2716            SignedChannelState::Settled {
2717                settle_tx,
2718                own_settle_adaptor_signature,
2719                ..
2720            } => (
2721                TxType::Revoked {
2722                    update_idx: signed_channel.update_idx,
2723                    own_adaptor_signature: *own_settle_adaptor_signature,
2724                    is_offer: false,
2725                    revoked_tx_type: RevokedTxType::Settle,
2726                },
2727                settle_tx.compute_txid(),
2728                None,
2729            ),
2730            s => {
2731                return Err(Error::InvalidState(format!(
2732                    "Expected rollback state of Established or Settled but was {s:?}"
2733                )))
2734            }
2735        };
2736
2737        let msg = crate::channel_updater::renew_channel_on_finalize(
2738            &self.secp,
2739            &mut signed_channel,
2740            renew_finalize,
2741            &self.signer_provider,
2742        )?;
2743
2744        self.chain_monitor.lock().await.add_tx(
2745            prev_tx_id,
2746            ChannelInfo {
2747                channel_id: signed_channel.channel_id,
2748                tx_type,
2749            },
2750        );
2751
2752        let buffer_tx =
2753            get_signed_channel_state!(signed_channel, Established, ref buffer_transaction)?;
2754
2755        self.chain_monitor.lock().await.add_tx(
2756            buffer_tx.compute_txid(),
2757            ChannelInfo {
2758                channel_id: signed_channel.channel_id,
2759                tx_type: TxType::BufferTx,
2760            },
2761        );
2762
2763        self.store
2764            .upsert_channel(Channel::Signed(signed_channel), None)
2765            .await?;
2766        self.store
2767            .persist_chain_monitor(&*self.chain_monitor.lock().await)
2768            .await?;
2769
2770        if let Some(closed_contract) = closed_contract {
2771            self.store.update_contract(&closed_contract).await?;
2772        }
2773
2774        Ok(msg)
2775    }
2776
2777    async fn on_renew_revoke(
2778        &self,
2779        renew_revoke: &RenewRevoke,
2780        peer_id: &PublicKey,
2781    ) -> Result<(), Error> {
2782        let mut signed_channel =
2783            get_channel_in_state!(self, &renew_revoke.channel_id, Signed, Some(*peer_id))?;
2784
2785        crate::channel_updater::renew_channel_on_revoke(
2786            &self.secp,
2787            &mut signed_channel,
2788            renew_revoke,
2789        )?;
2790
2791        self.store
2792            .upsert_channel(Channel::Signed(signed_channel), None)
2793            .await?;
2794
2795        Ok(())
2796    }
2797
2798    async fn on_collaborative_close_offer(
2799        &self,
2800        close_offer: &CollaborativeCloseOffer,
2801        peer_id: &PublicKey,
2802    ) -> Result<(), Error> {
2803        let mut signed_channel =
2804            get_channel_in_state!(self, &close_offer.channel_id, Signed, Some(*peer_id))?;
2805
2806        crate::channel_updater::on_collaborative_close_offer(
2807            &mut signed_channel,
2808            close_offer,
2809            PEER_TIMEOUT,
2810            &self.time,
2811        )?;
2812
2813        self.store
2814            .upsert_channel(Channel::Signed(signed_channel), None)
2815            .await?;
2816
2817        Ok(())
2818    }
2819
2820    async fn on_reject(&self, reject: &Reject, counter_party: &PublicKey) -> Result<(), Error> {
2821        let channel = self.store.get_channel(&reject.channel_id).await?;
2822
2823        if let Some(channel) = channel {
2824            if channel.get_counter_party_id() != *counter_party {
2825                return Err(Error::InvalidParameters(format!(
2826                    "Peer {:02x?} is not involved with {} {:02x?}.",
2827                    counter_party,
2828                    stringify!(Channel),
2829                    channel.get_id()
2830                )));
2831            }
2832            match channel {
2833                Channel::Offered(offered_channel) => {
2834                    let offered_contract = get_contract_in_state!(
2835                        self,
2836                        &offered_channel.offered_contract_id,
2837                        Offered,
2838                        None as Option<PublicKey>
2839                    )?;
2840                    let utxos = offered_contract
2841                        .funding_inputs
2842                        .iter()
2843                        .map(|funding_input| {
2844                            let txid = Transaction::consensus_decode(
2845                                &mut funding_input.prev_tx.as_slice(),
2846                            )
2847                            .expect("Transaction Decode Error")
2848                            .compute_txid();
2849                            let vout = funding_input.prev_tx_vout;
2850                            OutPoint { txid, vout }
2851                        })
2852                        .collect::<Vec<_>>();
2853
2854                    self.wallet.unreserve_utxos(&utxos)?;
2855
2856                    // remove rejected channel, since nothing has been confirmed on chain yet.
2857                    self.store
2858                        .upsert_channel(
2859                            Channel::Cancelled(offered_channel),
2860                            Some(Contract::Rejected(offered_contract)),
2861                        )
2862                        .await?;
2863                }
2864                Channel::Signed(mut signed_channel) => {
2865                    let contract = match signed_channel.state {
2866                        SignedChannelState::RenewOffered {
2867                            offered_contract_id,
2868                            ..
2869                        } => {
2870                            let offered_contract = get_contract_in_state!(
2871                                self,
2872                                &offered_contract_id,
2873                                Offered,
2874                                None::<PublicKey>
2875                            )?;
2876                            Some(Contract::Rejected(offered_contract))
2877                        }
2878                        _ => None,
2879                    };
2880
2881                    crate::channel_updater::on_reject(&mut signed_channel)?;
2882
2883                    self.store
2884                        .upsert_channel(Channel::Signed(signed_channel), contract)
2885                        .await?;
2886                }
2887                channel => {
2888                    return Err(Error::InvalidState(format!(
2889                        "Not in a state adequate to receive a reject message. {channel:?}"
2890                    )))
2891                }
2892            }
2893        } else {
2894            log_warn!(
2895                self.logger,
2896                "Couldn't find rejected dlc channel with id: {}",
2897                reject.channel_id.to_lower_hex_string()
2898            );
2899        }
2900
2901        Ok(())
2902    }
2903
2904    async fn channel_checks(&self) -> Result<(), Error> {
2905        let established_closing_channels = self
2906            .store
2907            .get_signed_channels(Some(SignedChannelStateType::Closing))
2908            .await?;
2909
2910        for channel in established_closing_channels {
2911            if let Err(e) = self.try_finalize_closing_established_channel(channel).await {
2912                log_error!(
2913                    self.logger,
2914                    "Error trying to close established channel: {}",
2915                    e
2916                );
2917            }
2918        }
2919
2920        if let Err(e) = self.check_for_timed_out_channels().await {
2921            log_error!(self.logger, "Error checking timed out channels {}", e);
2922        }
2923        self.check_for_watched_tx().await
2924    }
2925
2926    async fn check_for_timed_out_channels(&self) -> Result<(), Error> {
2927        check_for_timed_out_channels!(self, RenewOffered);
2928        check_for_timed_out_channels!(self, RenewAccepted);
2929        check_for_timed_out_channels!(self, RenewConfirmed);
2930        check_for_timed_out_channels!(self, SettledOffered);
2931        check_for_timed_out_channels!(self, SettledAccepted);
2932        check_for_timed_out_channels!(self, SettledConfirmed);
2933
2934        Ok(())
2935    }
2936
2937    pub(crate) async fn process_watched_txs(
2938        &self,
2939        watched_txs: Vec<(Transaction, ChannelInfo)>,
2940    ) -> Result<(), Error> {
2941        for (tx, channel_info) in watched_txs {
2942            let mut signed_channel = match get_channel_in_state!(
2943                self,
2944                &channel_info.channel_id,
2945                Signed,
2946                None as Option<PublicKey>
2947            ) {
2948                Ok(c) => c,
2949                Err(e) => {
2950                    log_error!(
2951                        self.logger,
2952                        "Could not retrieve channel {:?}: {}",
2953                        channel_info.channel_id,
2954                        e
2955                    );
2956                    continue;
2957                }
2958            };
2959
2960            let persist = match channel_info.tx_type {
2961                TxType::BufferTx => {
2962                    // TODO(tibo): should only considered closed after some confirmations.
2963                    // Ideally should save previous state, and maybe restore in
2964                    // case of reorg, though if the counter party has sent the
2965                    // tx to close the channel it is unlikely that the tx will
2966                    // not be part of a future block.
2967
2968                    let contract_id = signed_channel
2969                        .get_contract_id()
2970                        .expect("to have a contract id");
2971                    let mut state = SignedChannelState::Closing {
2972                        buffer_transaction: tx.clone(),
2973                        is_initiator: false,
2974                        contract_id,
2975                        keys_id: signed_channel.keys_id().ok_or_else(|| {
2976                            Error::InvalidState("Expected to have keys_id.".to_string())
2977                        })?,
2978                    };
2979                    std::mem::swap(&mut signed_channel.state, &mut state);
2980
2981                    signed_channel.roll_back_state = Some(state);
2982
2983                    self.store
2984                        .upsert_channel(Channel::Signed(signed_channel), None)
2985                        .await?;
2986
2987                    false
2988                }
2989                TxType::Revoked {
2990                    update_idx,
2991                    own_adaptor_signature,
2992                    is_offer,
2993                    revoked_tx_type,
2994                } => {
2995                    let secret = signed_channel
2996                        .counter_party_commitment_secrets
2997                        .get_secret(update_idx)
2998                        .expect("to be able to retrieve the per update secret");
2999                    let counter_per_update_secret = SecretKey::from_slice(&secret)
3000                        .expect("to be able to parse the counter per update secret.");
3001
3002                    let per_update_seed_pk = signed_channel.own_per_update_seed;
3003
3004                    let per_update_seed_sk = self
3005                        .signer_provider
3006                        .get_secret_key_for_pubkey(&per_update_seed_pk)?;
3007
3008                    let per_update_secret = SecretKey::from_slice(&build_commitment_secret(
3009                        per_update_seed_sk.as_ref(),
3010                        update_idx,
3011                    ))
3012                    .expect("a valid secret key.");
3013
3014                    let per_update_point =
3015                        PublicKey::from_secret_key(&self.secp, &per_update_secret);
3016
3017                    let own_revocation_params = signed_channel.own_points.get_revokable_params(
3018                        &self.secp,
3019                        &signed_channel.counter_points.revocation_basepoint,
3020                        &per_update_point,
3021                    );
3022
3023                    let counter_per_update_point =
3024                        PublicKey::from_secret_key(&self.secp, &counter_per_update_secret);
3025
3026                    let base_own_sk = self
3027                        .signer_provider
3028                        .get_secret_key_for_pubkey(&signed_channel.own_points.own_basepoint)?;
3029
3030                    let own_sk = derive_private_key(&self.secp, &per_update_point, &base_own_sk);
3031
3032                    let counter_revocation_params =
3033                        signed_channel.counter_points.get_revokable_params(
3034                            &self.secp,
3035                            &signed_channel.own_points.revocation_basepoint,
3036                            &counter_per_update_point,
3037                        );
3038
3039                    let witness = if signed_channel.own_params.fund_pubkey
3040                        < signed_channel.counter_params.fund_pubkey
3041                    {
3042                        tx.input[0].witness.to_vec().remove(1)
3043                    } else {
3044                        tx.input[0].witness.to_vec().remove(2)
3045                    };
3046
3047                    let sig_data = witness
3048                        .iter()
3049                        .take(witness.len() - 1)
3050                        .cloned()
3051                        .collect::<Vec<_>>();
3052                    let own_sig = Signature::from_der(&sig_data)?;
3053
3054                    let counter_sk = own_adaptor_signature.recover(
3055                        &self.secp,
3056                        &own_sig,
3057                        &counter_revocation_params.publish_pk.inner,
3058                    )?;
3059
3060                    let own_revocation_base_secret =
3061                        &self.signer_provider.get_secret_key_for_pubkey(
3062                            &signed_channel.own_points.revocation_basepoint,
3063                        )?;
3064
3065                    let counter_revocation_sk = derive_private_revocation_key(
3066                        &self.secp,
3067                        &counter_per_update_secret,
3068                        own_revocation_base_secret,
3069                    );
3070
3071                    let (offer_params, accept_params) = if is_offer {
3072                        (&own_revocation_params, &counter_revocation_params)
3073                    } else {
3074                        (&counter_revocation_params, &own_revocation_params)
3075                    };
3076
3077                    let fee_rate_per_vb: u64 = (self.fee_estimator.get_est_sat_per_1000_weight(
3078                        lightning::chain::chaininterface::ConfirmationTarget::UrgentOnChainSweep,
3079                    ) / 250)
3080                        .into();
3081
3082                    let signed_tx = match revoked_tx_type {
3083                        RevokedTxType::Buffer => {
3084                            ddk_dlc::channel::create_and_sign_punish_buffer_transaction(
3085                                &self.secp,
3086                                offer_params,
3087                                accept_params,
3088                                &own_sk,
3089                                &counter_sk,
3090                                &counter_revocation_sk,
3091                                &tx,
3092                                &self.wallet.get_new_address().await?,
3093                                0,
3094                                fee_rate_per_vb,
3095                            )?
3096                        }
3097                        RevokedTxType::Settle => {
3098                            ddk_dlc::channel::create_and_sign_punish_settle_transaction(
3099                                &self.secp,
3100                                offer_params,
3101                                accept_params,
3102                                &own_sk,
3103                                &counter_sk,
3104                                &counter_revocation_sk,
3105                                &tx,
3106                                &self.wallet.get_new_address().await?,
3107                                CET_NSEQUENCE,
3108                                0,
3109                                fee_rate_per_vb,
3110                                is_offer,
3111                            )?
3112                        }
3113                    };
3114
3115                    self.blockchain.send_transaction(&signed_tx).await?;
3116
3117                    let closed_channel = Channel::ClosedPunished(ClosedPunishedChannel {
3118                        counter_party: signed_channel.counter_party,
3119                        temporary_channel_id: signed_channel.temporary_channel_id,
3120                        channel_id: signed_channel.channel_id,
3121                        punish_txid: signed_tx.compute_txid(),
3122                    });
3123
3124                    //TODO(tibo): should probably make sure the tx is confirmed somewhere before
3125                    //stop watching the cheating tx.
3126                    self.chain_monitor
3127                        .lock()
3128                        .await
3129                        .cleanup_channel(signed_channel.channel_id);
3130                    self.store.upsert_channel(closed_channel, None).await?;
3131                    true
3132                }
3133                TxType::CollaborativeClose => {
3134                    if let Some(SignedChannelState::Established {
3135                        signed_contract_id, ..
3136                    }) = signed_channel.roll_back_state
3137                    {
3138                        let counter_payout = get_signed_channel_state!(
3139                            signed_channel,
3140                            CollaborativeCloseOffered,
3141                            counter_payout
3142                        )?;
3143                        let closed_contract = self
3144                            .get_collaboratively_closed_contract(
3145                                &signed_contract_id,
3146                                *counter_payout,
3147                                false,
3148                            )
3149                            .await?;
3150                        self.store
3151                            .update_contract(&Contract::Closed(closed_contract))
3152                            .await?;
3153                    }
3154                    let closed_channel = Channel::CollaborativelyClosed(ClosedChannel {
3155                        counter_party: signed_channel.counter_party,
3156                        temporary_channel_id: signed_channel.temporary_channel_id,
3157                        channel_id: signed_channel.channel_id,
3158                    });
3159                    self.chain_monitor
3160                        .lock()
3161                        .await
3162                        .cleanup_channel(signed_channel.channel_id);
3163                    self.store.upsert_channel(closed_channel, None).await?;
3164                    true
3165                }
3166                TxType::SettleTx => {
3167                    let closed_channel = Channel::CounterClosed(ClosedChannel {
3168                        counter_party: signed_channel.counter_party,
3169                        temporary_channel_id: signed_channel.temporary_channel_id,
3170                        channel_id: signed_channel.channel_id,
3171                    });
3172                    self.chain_monitor
3173                        .lock()
3174                        .await
3175                        .cleanup_channel(signed_channel.channel_id);
3176                    self.store.upsert_channel(closed_channel, None).await?;
3177                    true
3178                }
3179                TxType::Cet => {
3180                    let contract_id = signed_channel.get_contract_id();
3181                    let closed_channel = {
3182                        match &signed_channel.state {
3183                            SignedChannelState::Closing { is_initiator, .. } => {
3184                                if *is_initiator {
3185                                    Channel::Closed(ClosedChannel {
3186                                        counter_party: signed_channel.counter_party,
3187                                        temporary_channel_id: signed_channel.temporary_channel_id,
3188                                        channel_id: signed_channel.channel_id,
3189                                    })
3190                                } else {
3191                                    Channel::CounterClosed(ClosedChannel {
3192                                        counter_party: signed_channel.counter_party,
3193                                        temporary_channel_id: signed_channel.temporary_channel_id,
3194                                        channel_id: signed_channel.channel_id,
3195                                    })
3196                                }
3197                            }
3198                            _ => {
3199                                log_error!(self.logger, "Saw spending of buffer transaction without being in closing state");
3200                                Channel::Closed(ClosedChannel {
3201                                    counter_party: signed_channel.counter_party,
3202                                    temporary_channel_id: signed_channel.temporary_channel_id,
3203                                    channel_id: signed_channel.channel_id,
3204                                })
3205                            }
3206                        }
3207                    };
3208
3209                    self.chain_monitor
3210                        .lock()
3211                        .await
3212                        .cleanup_channel(signed_channel.channel_id);
3213                    let pre_closed_contract = futures::stream::iter(contract_id)
3214                        .then(|contract_id| {
3215                            let txn = tx.clone();
3216                            async move {
3217                                self.store.get_contract(&contract_id).await.map(|contract| {
3218                                    contract.and_then(|contract| match contract {
3219                                        Contract::Confirmed(signed_contract) => {
3220                                            Some(Contract::PreClosed(PreClosedContract {
3221                                                signed_contract,
3222                                                attestations: None,
3223                                                signed_cet: txn,
3224                                            }))
3225                                        }
3226                                        _ => None,
3227                                    })
3228                                })
3229                            }
3230                        })
3231                        .try_collect::<Vec<_>>()
3232                        .await?
3233                        .into_iter()
3234                        .flatten()
3235                        .next();
3236
3237                    self.store
3238                        .upsert_channel(closed_channel, pre_closed_contract)
3239                        .await?;
3240
3241                    true
3242                }
3243            };
3244
3245            if persist {
3246                self.store
3247                    .persist_chain_monitor(&*self.chain_monitor.lock().await)
3248                    .await?;
3249            }
3250        }
3251        Ok(())
3252    }
3253
3254    async fn check_for_watched_tx(&self) -> Result<(), Error> {
3255        let confirmed_txs = self.chain_monitor.lock().await.confirmed_txs();
3256
3257        self.process_watched_txs(confirmed_txs).await?;
3258
3259        self.store
3260            .persist_chain_monitor(&*self.chain_monitor.lock().await)
3261            .await?;
3262
3263        Ok(())
3264    }
3265
3266    async fn force_close_channel_internal(
3267        &self,
3268        mut channel: SignedChannel,
3269        is_initiator: bool,
3270    ) -> Result<(), Error> {
3271        match &channel.state {
3272            SignedChannelState::Established {
3273                counter_buffer_adaptor_signature,
3274                buffer_transaction,
3275                ..
3276            } => {
3277                let counter_buffer_adaptor_signature = *counter_buffer_adaptor_signature;
3278                let buffer_transaction = buffer_transaction.clone();
3279                self.initiate_unilateral_close_established_channel(
3280                    channel,
3281                    is_initiator,
3282                    counter_buffer_adaptor_signature,
3283                    buffer_transaction,
3284                )
3285                .await
3286            }
3287            SignedChannelState::RenewFinalized {
3288                buffer_transaction,
3289                offer_buffer_adaptor_signature,
3290                ..
3291            } => {
3292                let offer_buffer_adaptor_signature = *offer_buffer_adaptor_signature;
3293                let buffer_transaction = buffer_transaction.clone();
3294                self.initiate_unilateral_close_established_channel(
3295                    channel,
3296                    is_initiator,
3297                    offer_buffer_adaptor_signature,
3298                    buffer_transaction,
3299                )
3300                .await
3301            }
3302            SignedChannelState::Settled { .. } => {
3303                self.close_settled_channel(channel, is_initiator).await
3304            }
3305            SignedChannelState::SettledOffered { .. }
3306            | SignedChannelState::SettledReceived { .. }
3307            | SignedChannelState::SettledAccepted { .. }
3308            | SignedChannelState::SettledConfirmed { .. }
3309            | SignedChannelState::RenewOffered { .. }
3310            | SignedChannelState::RenewAccepted { .. }
3311            | SignedChannelState::RenewConfirmed { .. }
3312            | SignedChannelState::CollaborativeCloseOffered { .. } => {
3313                channel.state = channel
3314                    .roll_back_state
3315                    .take()
3316                    .expect("to have a rollback state");
3317                let channel_clone = channel.clone(); // Clone the channel to avoid moving it
3318                Box::pin(self.force_close_channel_internal(channel_clone, is_initiator)).await
3319            }
3320            SignedChannelState::Closing { .. } => Err(Error::InvalidState(
3321                "Channel is already closing.".to_string(),
3322            )),
3323        }
3324    }
3325
3326    /// Initiate the unilateral closing of a channel that has been established.
3327    async fn initiate_unilateral_close_established_channel(
3328        &self,
3329        mut signed_channel: SignedChannel,
3330        is_initiator: bool,
3331        buffer_adaptor_signature: EcdsaAdaptorSignature,
3332        buffer_transaction: Transaction,
3333    ) -> Result<(), Error> {
3334        let keys_id = signed_channel.keys_id().ok_or_else(|| {
3335            Error::InvalidState("Expected to be in a state with an associated keys id.".to_string())
3336        })?;
3337
3338        crate::channel_updater::initiate_unilateral_close_established_channel(
3339            &self.secp,
3340            &mut signed_channel,
3341            buffer_adaptor_signature,
3342            keys_id,
3343            buffer_transaction,
3344            &self.signer_provider,
3345            is_initiator,
3346        )?;
3347
3348        let buffer_transaction =
3349            get_signed_channel_state!(signed_channel, Closing, ref buffer_transaction)?;
3350
3351        self.blockchain.send_transaction(buffer_transaction).await?;
3352
3353        self.chain_monitor
3354            .lock()
3355            .await
3356            .remove_tx(&buffer_transaction.compute_txid());
3357
3358        self.store
3359            .upsert_channel(Channel::Signed(signed_channel), None)
3360            .await?;
3361
3362        self.store
3363            .persist_chain_monitor(&*self.chain_monitor.lock().await)
3364            .await?;
3365
3366        Ok(())
3367    }
3368
3369    /// Unilaterally close a channel that has been settled.
3370    async fn close_settled_channel(
3371        &self,
3372        signed_channel: SignedChannel,
3373        is_initiator: bool,
3374    ) -> Result<(), Error> {
3375        let (settle_tx, closed_channel) = crate::channel_updater::close_settled_channel(
3376            &self.secp,
3377            &signed_channel,
3378            &self.signer_provider,
3379            is_initiator,
3380        )?;
3381
3382        if self
3383            .blockchain
3384            .get_transaction_confirmations(&settle_tx.compute_txid())
3385            .await
3386            .unwrap_or(0)
3387            == 0
3388        {
3389            self.blockchain.send_transaction(&settle_tx).await?;
3390        }
3391
3392        self.chain_monitor
3393            .lock()
3394            .await
3395            .cleanup_channel(signed_channel.channel_id);
3396
3397        self.store.upsert_channel(closed_channel, None).await?;
3398
3399        Ok(())
3400    }
3401
3402    async fn get_collaboratively_closed_contract(
3403        &self,
3404        contract_id: &ContractId,
3405        payout: Amount,
3406        is_own_payout: bool,
3407    ) -> Result<ClosedContract, Error> {
3408        let contract = get_contract_in_state!(self, contract_id, Confirmed, None::<PublicKey>)?;
3409        let own_collateral = if contract.accepted_contract.offered_contract.is_offer_party {
3410            contract
3411                .accepted_contract
3412                .offered_contract
3413                .offer_params
3414                .collateral
3415        } else {
3416            contract.accepted_contract.accept_params.collateral
3417        };
3418        let own_payout = if is_own_payout {
3419            payout
3420        } else {
3421            contract.accepted_contract.offered_contract.total_collateral - payout
3422        };
3423        let pnl =
3424            SignedAmount::from_sat(own_payout.to_sat() as i64 - own_collateral.to_sat() as i64);
3425        Ok(ClosedContract {
3426            attestations: None,
3427            signed_cet: None,
3428            contract_id: *contract_id,
3429            temporary_contract_id: contract.accepted_contract.offered_contract.id,
3430            counter_party_id: contract.accepted_contract.offered_contract.counter_party,
3431            funding_txid: contract
3432                .accepted_contract
3433                .dlc_transactions
3434                .fund
3435                .compute_txid(),
3436            pnl,
3437        })
3438    }
3439
3440    async fn oracle_announcements(
3441        &self,
3442        contract_input: &ContractInput,
3443    ) -> Result<Vec<Vec<OracleAnnouncement>>, Error> {
3444        let announcements = stream::iter(contract_input.contract_infos.iter())
3445            .map(|x| {
3446                let future = self.get_oracle_announcements(&x.oracles);
3447                async move {
3448                    match future.await {
3449                        Ok(result) => Ok(result),
3450                        Err(e) => {
3451                            log_error!(self.logger, "Failed to get oracle announcements: {}", e);
3452                            Err(e)
3453                        }
3454                    }
3455                }
3456            })
3457            .collect::<FuturesUnordered<_>>()
3458            .await
3459            .try_collect::<Vec<_>>()
3460            .await?;
3461        Ok(announcements)
3462    }
3463}