ddk_manager/
manager.rs

1//! #Manager a component to create and update DLCs.
2
3use super::{
4    Blockchain, CachedContractSignerProvider, ContractSigner, Oracle, Storage, Time, Wallet,
5};
6use crate::chain_monitor::{ChainMonitor, ChannelInfo, RevokedTxType, TxType};
7use crate::channel::offered_channel::OfferedChannel;
8use crate::channel::signed_channel::{SignedChannel, SignedChannelState, SignedChannelStateType};
9use crate::channel::{Channel, ClosedChannel, ClosedPunishedChannel};
10use crate::channel_updater::get_signed_channel_state;
11use crate::channel_updater::verify_signed_channel;
12use crate::contract::{
13    accepted_contract::AcceptedContract, contract_info::ContractInfo,
14    contract_input::ContractInput, contract_input::OracleInput, offered_contract::OfferedContract,
15    signed_contract::SignedContract, AdaptorInfo, ClosedContract, Contract, FailedAcceptContract,
16    FailedSignContract, PreClosedContract,
17};
18use crate::contract_updater::{accept_contract, verify_accepted_and_sign_contract};
19use crate::error::Error;
20use crate::utils::get_object_in_state;
21use crate::{ChannelId, ContractId, ContractSignerProvider};
22use bitcoin::absolute::Height;
23use bitcoin::consensus::encode::serialize_hex;
24use bitcoin::consensus::Decodable;
25use bitcoin::hex::DisplayHex;
26use bitcoin::{Address, Amount, SignedAmount};
27use bitcoin::{OutPoint, Transaction};
28use ddk_messages::channel::{
29    AcceptChannel, CollaborativeCloseOffer, OfferChannel, Reject, RenewAccept, RenewConfirm,
30    RenewFinalize, RenewOffer, RenewRevoke, SettleAccept, SettleConfirm, SettleFinalize,
31    SettleOffer, SignChannel,
32};
33use ddk_messages::oracle_msgs::{OracleAnnouncement, OracleAttestation};
34use ddk_messages::{AcceptDlc, CloseDlc, Message as DlcMessage, OfferDlc, SignDlc};
35use futures::stream;
36use futures::stream::FuturesUnordered;
37use futures::{StreamExt, TryStreamExt};
38use lightning::chain::chaininterface::FeeEstimator;
39use lightning::ln::chan_utils::{
40    build_commitment_secret, derive_private_key, derive_private_revocation_key,
41};
42use lightning::util::logger::Logger;
43use lightning::{log_debug, log_error, log_info, log_trace, log_warn};
44use once_cell::sync::Lazy;
45use secp256k1_zkp::XOnlyPublicKey;
46use secp256k1_zkp::{
47    ecdsa::Signature, All, EcdsaAdaptorSignature, PublicKey, Secp256k1, SecretKey,
48};
49use std::collections::HashMap;
50use std::ops::Deref;
51use std::string::ToString;
52use std::sync::Arc;
53use tokio::sync::Mutex;
54
55/// The number of confirmations required before moving the the confirmed state.
56pub const DEFAULT_NB_CONFIRMATIONS: u32 = 3;
57
58/// The number of confirmations required before moving the the confirmed state.
59/// Uses the NB_CONFIRMATIONS environment variable to set the value.
60static NB_CONFIRMATIONS: Lazy<u32> = Lazy::new(|| match std::env::var("NB_CONFIRMATIONS") {
61    Ok(val) => val.parse().unwrap_or(DEFAULT_NB_CONFIRMATIONS),
62    Err(_) => DEFAULT_NB_CONFIRMATIONS,
63});
64
65/// The delay to set the refund value to.
66pub const REFUND_DELAY: u32 = 86400 * 7;
67/// The nSequence value used for CETs in DLC channels
68pub const CET_NSEQUENCE: u32 = 288;
69/// Timeout in seconds when waiting for a peer's reply, after which a DLC channel
70/// is forced closed.
71pub const PEER_TIMEOUT: u64 = 3600;
72
73type ClosableContractInfo<'a> = Option<(
74    &'a ContractInfo,
75    &'a AdaptorInfo,
76    Vec<(usize, OracleAttestation)>,
77)>;
78
79/// Used to create and update DLCs.
80#[derive(Debug)]
81pub struct Manager<
82    W: Deref,
83    SP: Deref,
84    B: Deref,
85    S: Deref,
86    O: Deref,
87    T: Deref,
88    F: Deref,
89    X: ContractSigner,
90    L: Deref,
91> where
92    W::Target: Wallet,
93    SP::Target: ContractSignerProvider<Signer = X>,
94    B::Target: Blockchain,
95    S::Target: Storage,
96    O::Target: Oracle,
97    T::Target: Time,
98    F::Target: FeeEstimator,
99    L::Target: Logger,
100{
101    oracles: HashMap<XOnlyPublicKey, O>,
102    wallet: W,
103    signer_provider: SP,
104    blockchain: B,
105    store: S,
106    secp: Secp256k1<All>,
107    chain_monitor: Mutex<ChainMonitor>,
108    time: T,
109    fee_estimator: F,
110    logger: L,
111}
112
113macro_rules! get_contract_in_state {
114    ($manager: ident, $contract_id: expr, $state: ident, $peer_id: expr) => {{
115        get_object_in_state!(
116            $manager,
117            $contract_id,
118            $state,
119            $peer_id,
120            Contract,
121            get_contract
122        )
123    }};
124}
125
126macro_rules! get_channel_in_state {
127    ($manager: ident, $channel_id: expr, $state: ident, $peer_id: expr) => {{
128        get_object_in_state!(
129            $manager,
130            $channel_id,
131            $state,
132            $peer_id,
133            Channel,
134            get_channel
135        )
136    }};
137}
138
139macro_rules! get_signed_channel_rollback_state {
140    ($signed_channel: ident, $state: ident, $($field: ident),*) => {{
141       match $signed_channel.roll_back_state.as_ref() {
142           Some(SignedChannelState::$state{$($field,)* ..}) => Ok(($($field,)*)),
143           _ => Err(Error::InvalidState(format!("Expected rollback state {} got {:?}", stringify!($state), $signed_channel.state))),
144        }
145    }};
146}
147
148macro_rules! check_for_timed_out_channels {
149    ($manager: ident, $state: ident) => {
150        let channels = $manager
151            .store
152            .get_signed_channels(Some(SignedChannelStateType::$state))
153            .await?;
154
155        for channel in channels {
156            if let SignedChannelState::$state { timeout, .. } = channel.state {
157                let is_timed_out = timeout < $manager.time.unix_time_now();
158                if is_timed_out {
159                    match $manager.force_close_channel_internal(channel, true).await {
160                        Err(e) => {
161                            log_error!($manager.logger, "Error force closing channel. error={}", e)
162                        }
163                        _ => {}
164                    }
165                }
166            }
167        }
168    };
169}
170
171impl<
172        W: Deref,
173        SP: Deref,
174        B: Deref,
175        S: Deref,
176        O: Deref,
177        T: Deref,
178        F: Deref,
179        X: ContractSigner,
180        L: Deref,
181    > Manager<W, Arc<CachedContractSignerProvider<SP, X>>, B, S, O, T, F, X, L>
182where
183    W::Target: Wallet,
184    SP::Target: ContractSignerProvider<Signer = X>,
185    B::Target: Blockchain,
186    S::Target: Storage,
187    O::Target: Oracle,
188    T::Target: Time,
189    F::Target: FeeEstimator,
190    L::Target: Logger,
191{
192    /// Create a new Manager struct.
193    #[allow(clippy::too_many_arguments)]
194    #[tracing::instrument(skip_all)]
195    pub async fn new(
196        wallet: W,
197        signer_provider: SP,
198        blockchain: B,
199        store: S,
200        oracles: HashMap<XOnlyPublicKey, O>,
201        time: T,
202        fee_estimator: F,
203        logger: L,
204    ) -> Result<Self, Error> {
205        let init_height = blockchain.get_blockchain_height().await?;
206        let chain_monitor = Mutex::new(
207            store
208                .get_chain_monitor()
209                .await?
210                .unwrap_or(ChainMonitor::new(init_height)),
211        );
212
213        let signer_provider = Arc::new(CachedContractSignerProvider::new(signer_provider));
214
215        log_info!(logger, "Manager initialized");
216
217        Ok(Manager {
218            secp: secp256k1_zkp::Secp256k1::new(),
219            wallet,
220            signer_provider,
221            blockchain,
222            store,
223            oracles,
224            time,
225            fee_estimator,
226            chain_monitor,
227            logger,
228        })
229    }
230
231    /// Get the store from the Manager to access contracts.
232    pub fn get_store(&self) -> &S {
233        &self.store
234    }
235
236    /// Function called to pass a DlcMessage to the Manager.
237    #[tracing::instrument(skip_all)]
238    pub async fn on_dlc_message(
239        &self,
240        msg: &DlcMessage,
241        counter_party: PublicKey,
242    ) -> Result<Option<DlcMessage>, Error> {
243        match msg {
244            DlcMessage::Offer(o) => {
245                log_debug!(self.logger, "Received offer message");
246                self.on_offer_message(o, counter_party).await?;
247                Ok(None)
248            }
249            DlcMessage::Accept(a) => {
250                log_debug!(self.logger, "Received accept message");
251                Ok(Some(self.on_accept_message(a, &counter_party).await?))
252            }
253            DlcMessage::Sign(s) => {
254                log_debug!(self.logger, "Received sign message");
255                self.on_sign_message(s, &counter_party).await?;
256                Ok(None)
257            }
258            DlcMessage::Close(c) => {
259                log_debug!(self.logger, "Received close message");
260                self.on_close_message(c, &counter_party).await?;
261                Ok(None)
262            }
263            DlcMessage::OfferChannel(o) => {
264                log_debug!(self.logger, "Received offer channel message");
265                self.on_offer_channel(o, counter_party).await?;
266                Ok(None)
267            }
268            DlcMessage::AcceptChannel(a) => {
269                log_debug!(self.logger, "Received accept channel message");
270                Ok(Some(DlcMessage::SignChannel(
271                    self.on_accept_channel(a, &counter_party).await?,
272                )))
273            }
274            DlcMessage::SignChannel(s) => {
275                log_debug!(self.logger, "Received sign channel message");
276                self.on_sign_channel(s, &counter_party).await?;
277                Ok(None)
278            }
279            DlcMessage::SettleOffer(s) => {
280                log_debug!(self.logger, "Received settle offer message");
281                match self.on_settle_offer(s, &counter_party).await? {
282                    Some(msg) => Ok(Some(DlcMessage::Reject(msg))),
283                    None => Ok(None),
284                }
285            }
286            DlcMessage::SettleAccept(s) => {
287                log_debug!(self.logger, "Received settle accept message");
288                Ok(Some(DlcMessage::SettleConfirm(
289                    self.on_settle_accept(s, &counter_party).await?,
290                )))
291            }
292            DlcMessage::SettleConfirm(s) => {
293                log_debug!(self.logger, "Received settle confirm message");
294                Ok(Some(DlcMessage::SettleFinalize(
295                    self.on_settle_confirm(s, &counter_party).await?,
296                )))
297            }
298            DlcMessage::SettleFinalize(s) => {
299                log_debug!(self.logger, "Received settle finalize message");
300                self.on_settle_finalize(s, &counter_party).await?;
301                Ok(None)
302            }
303            DlcMessage::RenewOffer(r) => {
304                log_debug!(self.logger, "Received renew offer message");
305                match self.on_renew_offer(r, &counter_party).await? {
306                    Some(msg) => Ok(Some(DlcMessage::Reject(msg))),
307                    None => Ok(None),
308                }
309            }
310            DlcMessage::RenewAccept(r) => {
311                log_debug!(self.logger, "Received renew accept message");
312                Ok(Some(DlcMessage::RenewConfirm(
313                    self.on_renew_accept(r, &counter_party).await?,
314                )))
315            }
316            DlcMessage::RenewConfirm(r) => {
317                log_debug!(self.logger, "Received renew confirm message");
318                Ok(Some(DlcMessage::RenewFinalize(
319                    self.on_renew_confirm(r, &counter_party).await?,
320                )))
321            }
322            DlcMessage::RenewFinalize(r) => {
323                log_debug!(self.logger, "Received renew finalize message");
324                let revoke = self.on_renew_finalize(r, &counter_party).await?;
325                Ok(Some(DlcMessage::RenewRevoke(revoke)))
326            }
327            DlcMessage::RenewRevoke(r) => {
328                log_debug!(self.logger, "Received renew revoke message");
329                self.on_renew_revoke(r, &counter_party).await?;
330                Ok(None)
331            }
332            DlcMessage::CollaborativeCloseOffer(c) => {
333                log_debug!(self.logger, "Received collaborative close offer message");
334                self.on_collaborative_close_offer(c, &counter_party).await?;
335                Ok(None)
336            }
337            DlcMessage::Reject(r) => {
338                log_debug!(self.logger, "Received reject message");
339                self.on_reject(r, &counter_party).await?;
340                Ok(None)
341            }
342        }
343    }
344
345    /// Create a new spliced offer
346    #[tracing::instrument(skip_all)]
347    pub async fn send_splice_offer(
348        &self,
349        contract_input: &ContractInput,
350        counter_party: PublicKey,
351        contract_id: &ContractId,
352    ) -> Result<OfferDlc, Error> {
353        let oracle_announcements = self.oracle_announcements(contract_input).await?;
354
355        self.send_splice_offer_with_announcements(
356            contract_input,
357            counter_party,
358            contract_id,
359            oracle_announcements,
360        )
361        .await
362    }
363
364    /// Creates a new offer DLC using an existing DLC as an input.
365    /// The new DLC MUST use a Confirmed contract as an input.
366    #[tracing::instrument(skip_all)]
367    pub async fn send_splice_offer_with_announcements(
368        &self,
369        contract_input: &ContractInput,
370        counter_party: PublicKey,
371        contract_id: &ContractId,
372        oracle_announcements: Vec<Vec<OracleAnnouncement>>,
373    ) -> Result<OfferDlc, Error> {
374        let confirmed_contract =
375            get_contract_in_state!(self, contract_id, Confirmed, Some(counter_party))?;
376
377        let dlc_input = confirmed_contract.get_dlc_input();
378
379        let (offered_contract, offer_msg) = crate::contract_updater::offer_contract(
380            &self.secp,
381            contract_input,
382            oracle_announcements,
383            vec![dlc_input],
384            REFUND_DELAY,
385            &counter_party,
386            &self.wallet,
387            &self.blockchain,
388            &self.time,
389            &self.signer_provider,
390            &self.logger,
391        )
392        .await?;
393
394        offered_contract.validate()?;
395
396        self.store.create_contract(&offered_contract).await?;
397
398        Ok(offer_msg)
399    }
400
401    /// Function called to create a new DLC. The offered contract will be stored
402    /// and an OfferDlc message returned.
403    ///
404    /// This function will fetch the oracle announcements from the oracle.
405    #[tracing::instrument(skip_all)]
406    pub async fn send_offer(
407        &self,
408        contract_input: &ContractInput,
409        counter_party: PublicKey,
410    ) -> Result<OfferDlc, Error> {
411        // If the oracle announcement fails to retrieve, then log and continue.
412        let oracle_announcements = self.oracle_announcements(contract_input).await?;
413
414        self.send_offer_with_announcements(contract_input, counter_party, oracle_announcements)
415            .await
416    }
417
418    /// Function called to create a new DLC. The offered contract will be stored
419    /// and an OfferDlc message returned.
420    ///
421    /// This function allows to pass the oracle announcements directly instead of
422    /// fetching them from the oracle.
423    #[tracing::instrument(skip_all)]
424    pub async fn send_offer_with_announcements(
425        &self,
426        contract_input: &ContractInput,
427        counter_party: PublicKey,
428        oracle_announcements: Vec<Vec<OracleAnnouncement>>,
429    ) -> Result<OfferDlc, Error> {
430        let (offered_contract, offer_msg) = crate::contract_updater::offer_contract(
431            &self.secp,
432            contract_input,
433            oracle_announcements,
434            vec![],
435            REFUND_DELAY,
436            &counter_party,
437            &self.wallet,
438            &self.blockchain,
439            &self.time,
440            &self.signer_provider,
441            &self.logger,
442        )
443        .await?;
444
445        offered_contract.validate()?;
446
447        self.store.create_contract(&offered_contract).await?;
448
449        Ok(offer_msg)
450    }
451
452    /// Function to call to accept a DLC for which an offer was received.
453    #[tracing::instrument(skip_all)]
454    pub async fn accept_contract_offer(
455        &self,
456        contract_id: &ContractId,
457    ) -> Result<(ContractId, PublicKey, AcceptDlc), Error> {
458        let offered_contract =
459            get_contract_in_state!(self, contract_id, Offered, None as Option<PublicKey>)?;
460
461        let counter_party = offered_contract.counter_party;
462
463        let (accepted_contract, accept_msg) = accept_contract(
464            &self.secp,
465            &offered_contract,
466            &self.wallet,
467            &self.signer_provider,
468            &self.blockchain,
469            &self.logger,
470        )
471        .await?;
472
473        self.wallet.import_address(&Address::p2wsh(
474            &accepted_contract.dlc_transactions.funding_script_pubkey,
475            self.blockchain.get_network()?,
476        ))?;
477
478        let contract_id = accepted_contract.get_contract_id();
479
480        self.store
481            .update_contract(&Contract::Accepted(accepted_contract))
482            .await?;
483
484        log_info!(
485            self.logger,
486            "Accepted and stored the contract. temp_id={} contract_id={}",
487            offered_contract.id.to_lower_hex_string(),
488            contract_id.to_lower_hex_string(),
489        );
490
491        Ok((contract_id, counter_party, accept_msg))
492    }
493
494    /// Function to update the state of the [`ChainMonitor`] with new
495    /// blocks.
496    ///
497    /// Consumers **MUST** call this periodically in order to
498    /// determine when pending transactions reach confirmation.
499    #[tracing::instrument(skip_all)]
500    pub async fn periodic_chain_monitor(&self) -> Result<(), Error> {
501        let cur_height = self.blockchain.get_blockchain_height().await?;
502        let last_height = self.chain_monitor.lock().await.last_height;
503
504        // TODO(luckysori): We could end up reprocessing a block at
505        // the same height if there is a reorg.
506        if cur_height < last_height {
507            return Err(Error::InvalidState(
508                "Current height is lower than last height.".to_string(),
509            ));
510        }
511
512        for height in last_height + 1..=cur_height {
513            let block = self.blockchain.get_block_at_height(height).await?;
514
515            self.chain_monitor
516                .lock()
517                .await
518                .process_block(&block, height);
519        }
520
521        Ok(())
522    }
523
524    /// Function to call to check the state of the currently executing DLCs and
525    /// update them if possible.
526    #[tracing::instrument(skip_all, level = "debug")]
527    pub async fn periodic_check(&self, check_channels: bool) -> Result<(), Error> {
528        let signed_contracts = self.check_for_spliced_contract().await?;
529        self.check_signed_contracts(&signed_contracts).await?;
530        self.check_confirmed_contracts().await?;
531        self.check_preclosed_contracts().await?;
532
533        if check_channels {
534            self.channel_checks().await?;
535        }
536
537        Ok(())
538    }
539
540    /// Function to call to offer a DLC.
541    #[tracing::instrument(skip_all)]
542    pub async fn on_offer_message(
543        &self,
544        offered_message: &OfferDlc,
545        counter_party: PublicKey,
546    ) -> Result<(), Error> {
547        offered_message.validate(&self.secp, REFUND_DELAY, REFUND_DELAY * 2)?;
548        let keys_id = self
549            .signer_provider
550            .derive_signer_key_id(false, offered_message.temporary_contract_id);
551        let contract: OfferedContract =
552            OfferedContract::try_from_offer_dlc(offered_message, counter_party, keys_id)?;
553        contract.validate()?;
554
555        if self.store.get_contract(&contract.id).await?.is_some() {
556            return Err(Error::InvalidParameters(
557                "Contract with identical id already exists".to_string(),
558            ));
559        }
560
561        self.store.create_contract(&contract).await?;
562        log_info!(
563            self.logger,
564            "Created and stored the offered contract. temp_id={}",
565            contract.id.to_lower_hex_string(),
566        );
567        Ok(())
568    }
569
570    /// Function to call to close a DLC.
571    #[tracing::instrument(skip_all)]
572    pub async fn on_close_message(
573        &self,
574        close_msg: &CloseDlc,
575        counter_party: &PublicKey,
576    ) -> Result<(), Error> {
577        // Validate that the contract exists and is in the correct state
578        let signed_contract = get_contract_in_state!(
579            self,
580            &close_msg.contract_id,
581            Confirmed,
582            Some(*counter_party)
583        )?;
584
585        // Validate the close message by attempting to construct the close transaction
586        // This verifies the signature and transaction structure without broadcasting
587        let _close_tx = crate::contract_updater::complete_cooperative_close(
588            &self.secp,
589            &signed_contract,
590            close_msg,
591            &self.signer_provider,
592            &self.logger,
593        )?;
594
595        // Message is valid - the application layer should call accept_cooperative_close()
596        // if they want to accept the offered terms
597        Ok(())
598    }
599
600    /// Function to call to accept a DLC for which an offer was received.
601    #[tracing::instrument(skip_all)]
602    pub async fn on_accept_message(
603        &self,
604        accept_msg: &AcceptDlc,
605        counter_party: &PublicKey,
606    ) -> Result<DlcMessage, Error> {
607        let offered_contract = get_contract_in_state!(
608            self,
609            &accept_msg.temporary_contract_id,
610            Offered,
611            Some(*counter_party)
612        )?;
613
614        let (signed_contract, signed_msg) = match verify_accepted_and_sign_contract(
615            &self.secp,
616            &offered_contract,
617            accept_msg,
618            &self.wallet,
619            &self.signer_provider,
620            &self.store,
621            &self.logger,
622        )
623        .await
624        {
625            Ok(contract) => contract,
626            Err(e) => {
627                log_error!(
628                    self.logger,
629                    "Error in on_accept_message. tmp_contract_id={} error={}",
630                    offered_contract.id.to_lower_hex_string(),
631                    e.to_string()
632                );
633                return self
634                    .accept_fail_on_error(offered_contract, accept_msg.clone(), e)
635                    .await;
636            }
637        };
638
639        log_info!(
640            self.logger,
641            "Verified the accept message and signed the contract. temp_id={} contract_id={}",
642            offered_contract.id.to_lower_hex_string(),
643            signed_contract.accepted_contract.get_contract_id_string(),
644        );
645
646        let contract_id = signed_contract.accepted_contract.get_contract_id_string();
647
648        self.wallet.import_address(&Address::p2wsh(
649            &signed_contract
650                .accepted_contract
651                .dlc_transactions
652                .funding_script_pubkey,
653            self.blockchain.get_network()?,
654        ))?;
655
656        self.store
657            .update_contract(&Contract::Signed(signed_contract))
658            .await?;
659
660        log_info!(
661            self.logger,
662            "Accepted and signed the contract. temp_id={} contract_id={}",
663            offered_contract.id.to_lower_hex_string(),
664            contract_id,
665        );
666
667        Ok(DlcMessage::Sign(signed_msg))
668    }
669
670    /// Function to call to sign a DLC for which an accept was received.
671    #[tracing::instrument(skip_all)]
672    pub async fn on_sign_message(
673        &self,
674        sign_message: &SignDlc,
675        peer_id: &PublicKey,
676    ) -> Result<(), Error> {
677        let accepted_contract =
678            get_contract_in_state!(self, &sign_message.contract_id, Accepted, Some(*peer_id))?;
679        let (signed_contract, fund_tx) = match crate::contract_updater::verify_signed_contract(
680            &self.secp,
681            &accepted_contract,
682            sign_message,
683            &self.wallet,
684            &self.store,
685            &self.signer_provider,
686            &self.logger,
687        )
688        .await
689        {
690            Ok(contract) => contract,
691            Err(e) => {
692                log_error!(
693                    self.logger,
694                    "Error in on_sign_message. contract_id={} error={}",
695                    accepted_contract.get_contract_id_string(),
696                    e.to_string()
697                );
698                return self
699                    .sign_fail_on_error(accepted_contract, sign_message.clone(), e)
700                    .await;
701            }
702        };
703
704        self.store
705            .update_contract(&Contract::Signed(signed_contract.clone()))
706            .await?;
707
708        self.blockchain.send_transaction(&fund_tx).await?;
709
710        // Check if there are any DLC inputs in the funding inputs of the contract.
711        // If there are, mark the contract as pre-closed.
712        if accepted_contract
713            .offered_contract
714            .funding_inputs
715            .iter()
716            .any(|c| c.dlc_input.is_some())
717        {
718            let Some(dlc_input) = accepted_contract
719                .offered_contract
720                .funding_inputs
721                .iter()
722                .find(|c| c.dlc_input.is_some())
723            else {
724                return Ok(());
725            };
726            let preclosed_contract = get_contract_in_state!(
727                self,
728                &dlc_input.dlc_input.as_ref().unwrap().contract_id,
729                Confirmed,
730                None as Option<PublicKey>
731            )?;
732
733            let preclosed_contract = PreClosedContract {
734                signed_contract: preclosed_contract.clone(),
735                attestations: None,
736                signed_cet: signed_contract
737                    .accepted_contract
738                    .dlc_transactions
739                    .fund
740                    .clone(),
741            };
742
743            log_debug!(self.logger,
744                "Contract contains a DLC input. Marking the previous contract as pre-closed. dlc_input_contract_id={}", 
745                preclosed_contract.signed_contract.accepted_contract.get_contract_id_string()
746            );
747
748            self.store
749                .update_contract(&Contract::PreClosed(preclosed_contract.clone()))
750                .await?;
751        }
752
753        Ok(())
754    }
755
756    #[tracing::instrument(skip_all, level = "debug")]
757    async fn get_oracle_announcements(
758        &self,
759        oracle_inputs: &OracleInput,
760    ) -> Result<Vec<OracleAnnouncement>, Error> {
761        let mut announcements = Vec::new();
762        for pubkey in &oracle_inputs.public_keys {
763            let oracle = self
764                .oracles
765                .get(pubkey)
766                .ok_or_else(|| Error::InvalidParameters("Unknown oracle public key".to_string()))?;
767            let announcement = oracle.get_announcement(&oracle_inputs.event_id).await?;
768            announcements.push(announcement);
769        }
770
771        Ok(announcements)
772    }
773
774    async fn sign_fail_on_error<R>(
775        &self,
776        accepted_contract: AcceptedContract,
777        sign_message: SignDlc,
778        e: Error,
779    ) -> Result<R, Error> {
780        log_error!(
781            self.logger,
782            "Error in on_sign_message marking contract as failed sign. contract_id={} error={}",
783            accepted_contract.get_contract_id_string(),
784            e.to_string()
785        );
786        self.store
787            .update_contract(&Contract::FailedSign(FailedSignContract {
788                accepted_contract,
789                sign_message,
790                error_message: e.to_string(),
791            }))
792            .await?;
793        Err(e)
794    }
795
796    async fn accept_fail_on_error<R>(
797        &self,
798        offered_contract: OfferedContract,
799        accept_message: AcceptDlc,
800        e: Error,
801    ) -> Result<R, Error> {
802        log_error!(
803            self.logger,
804            "Error in on_accept_message marking contract as failed accept. contract_id={} error={}",
805            offered_contract.id.to_lower_hex_string(),
806            e.to_string()
807        );
808        self.store
809            .update_contract(&Contract::FailedAccept(FailedAcceptContract {
810                offered_contract,
811                accept_message,
812                error_message: e.to_string(),
813            }))
814            .await?;
815        Err(e)
816    }
817
818    #[tracing::instrument(skip_all, level = "debug")]
819    async fn check_signed_contract(&self, contract: &SignedContract) -> Result<(), Error> {
820        let confirmations = self
821            .blockchain
822            .get_transaction_confirmations(
823                &contract
824                    .accepted_contract
825                    .dlc_transactions
826                    .fund
827                    .compute_txid(),
828            )
829            .await?;
830        if confirmations >= *NB_CONFIRMATIONS {
831            log_info!(
832                self.logger,
833                "Marking signed contract as confirmed. confirmations={} contract_id={}",
834                confirmations,
835                contract.accepted_contract.get_contract_id_string(),
836            );
837            self.store
838                .update_contract(&Contract::Confirmed(contract.clone()))
839                .await?;
840        } else {
841            log_debug!(self.logger,
842                "Not enough confirmations to mark contract as confirmed. confirmations={} required={} contract_id={}", 
843                confirmations,
844                *NB_CONFIRMATIONS,
845                contract.accepted_contract.get_contract_id_string(),
846            );
847        }
848        Ok(())
849    }
850
851    #[tracing::instrument(skip_all, level = "debug")]
852    async fn check_signed_contracts(
853        &self,
854        signed_contracts: &[SignedContract],
855    ) -> Result<(), Error> {
856        for c in signed_contracts {
857            if let Err(e) = self.check_signed_contract(c).await {
858                log_error!(
859                    self.logger,
860                    "Error checking signed contract. contract_id={} error={}",
861                    c.accepted_contract.get_contract_id_string(),
862                    e.to_string()
863                )
864            }
865        }
866
867        Ok(())
868    }
869
870    #[tracing::instrument(skip_all, level = "debug")]
871    async fn check_for_spliced_contract(&self) -> Result<Vec<SignedContract>, Error> {
872        let contracts = self.get_store().get_signed_contracts().await?;
873        for contract in &contracts {
874            let dlc_input = contract
875                .accepted_contract
876                .offered_contract
877                .funding_inputs
878                .iter()
879                .find(|d| d.dlc_input.is_some());
880
881            let Some(funding_input) = dlc_input else {
882                continue;
883            };
884
885            let contract_id = funding_input.dlc_input.as_ref().unwrap().contract_id;
886
887            let confirmed_contract_in_splice = match get_contract_in_state!(
888                self,
889                &contract_id,
890                Confirmed,
891                None as Option<PublicKey>
892            ) {
893                Ok(c) => Ok(c),
894                Err(Error::InvalidState(e)) => {
895                    log_trace!(self.logger,
896                        "The previouse contract referenced in a splice transaction is in an unexpected state. contract_id={} error={}", 
897                        contract_id.to_lower_hex_string(), e.to_string(),
898                    );
899                    continue;
900                }
901                Err(e) => {
902                    log_debug!(self.logger,
903                        "The previouse contract referenced in a splice transaction failed to retrieve. contract_id={} error={}", 
904                        contract_id.to_lower_hex_string(), e.to_string(),
905                    );
906                    Err(e)
907                }
908            }?;
909
910            let preclosed_contract = PreClosedContract {
911                signed_contract: confirmed_contract_in_splice.clone(),
912                attestations: None,
913                signed_cet: contract.accepted_contract.dlc_transactions.fund.clone(),
914            };
915
916            log_debug!(self.logger,
917                "The previous contract that was spliced is now closed because the splice contract is confirmed. contract_id={}", 
918                contract_id.to_lower_hex_string(),
919            );
920
921            self.store
922                .update_contract(&Contract::PreClosed(preclosed_contract.clone()))
923                .await?;
924        }
925        Ok(contracts)
926    }
927
928    #[tracing::instrument(skip_all, level = "debug")]
929    async fn check_confirmed_contracts(&self) -> Result<(), Error> {
930        for c in self.store.get_confirmed_contracts().await? {
931            // Confirmed contracts from channel are processed in channel specific methods.
932            if c.channel_id.is_some() {
933                continue;
934            }
935            if let Err(e) = self.check_confirmed_contract(&c).await {
936                log_error!(
937                    self.logger,
938                    "Error checking confirmed contract. contract_id={} error={}",
939                    c.accepted_contract.get_contract_id_string(),
940                    e.to_string()
941                )
942            }
943        }
944
945        Ok(())
946    }
947
948    #[tracing::instrument(skip_all, level = "debug")]
949    async fn get_closable_contract_info<'a>(
950        &'a self,
951        contract: &'a SignedContract,
952    ) -> ClosableContractInfo<'a> {
953        let contract_infos = &contract.accepted_contract.offered_contract.contract_info;
954        let adaptor_infos = &contract.accepted_contract.adaptor_infos;
955        for (contract_info, adaptor_info) in contract_infos.iter().zip(adaptor_infos.iter()) {
956            log_debug!(
957                self.logger,
958                "Checking contract for oracle maturation. contract_id={}",
959                contract.accepted_contract.get_contract_id_string()
960            );
961            let matured: Vec<_> = contract_info
962                .oracle_announcements
963                .iter()
964                .filter(|x| {
965                    (x.oracle_event.event_maturity_epoch as u64) <= self.time.unix_time_now()
966                })
967                .enumerate()
968                .collect();
969            if matured.len() >= contract_info.threshold {
970                let attestations = stream::iter(matured.iter())
971                    .map(|(i, announcement)| async move {
972                        log_debug!(self.logger,
973                            "Oracle announcement for contract is matured. Getting attestations. contract_id={} event_id={}", 
974                            contract.accepted_contract.get_contract_id_string(),
975                            announcement.oracle_event.event_id
976                        );
977                        // First try to get the oracle
978                        let oracle = match self.oracles.get(&announcement.oracle_public_key) {
979                            Some(oracle) => oracle,
980                            None => {
981                                log_debug!(self.logger,
982                                    "Oracle not found. pubkey={}. contract_id={} event_id={}", 
983                                    announcement.oracle_public_key,
984                                    contract.accepted_contract.get_contract_id_string(),
985                                    announcement.oracle_event.event_id
986                                );
987                                return None;
988                            }
989                        };
990                        // Then try to get the attestation
991                        let attestation = match oracle
992                            .get_attestation(&announcement.oracle_event.event_id)
993                            .await
994                        {
995                            Ok(attestation) => attestation,
996                            Err(e) => {
997                                log_error!(self.logger,
998                                    "Attestation not found for event. pubkey={} event_id={} error={}",
999                                    announcement.oracle_public_key,
1000                                    announcement.oracle_event.event_id,
1001                                    e.to_string()
1002                                );
1003                                return None;
1004                            }
1005                        };
1006                        // Validate the attestation
1007                        if let Err(e) = attestation.validate(&self.secp, announcement) {
1008                            log_error!(self.logger,
1009                                "Oracle attestation is not valid. pubkey={} event_id={}, error={}",
1010                                announcement.oracle_public_key,
1011                                announcement.oracle_event.event_id,
1012                                e.to_string()
1013                            );
1014                            return None;
1015                        }
1016                        log_info!(self.logger,
1017                            "Retrieved a valid attestation. pubkey={} event_id={} outcomes={:?}", 
1018                            announcement.oracle_public_key,
1019                            announcement.oracle_event.event_id,
1020                            attestation.outcomes
1021                        );
1022                        Some((*i, attestation))
1023                    })
1024                    .collect::<FuturesUnordered<_>>()
1025                    .await
1026                    .filter_map(|result| async move { result }) // Filter out None values
1027                    .collect::<Vec<_>>()
1028                    .await;
1029                if attestations.len() >= contract_info.threshold {
1030                    log_info!(self.logger,
1031                        "Found enough attestations to close contract. contract_id={} attestations={}", 
1032                        contract.accepted_contract.get_contract_id_string(),
1033                        attestations.len()
1034                    );
1035                    return Some((contract_info, adaptor_info, attestations));
1036                }
1037            }
1038        }
1039        None
1040    }
1041
1042    #[tracing::instrument(skip_all, level = "debug")]
1043    async fn check_confirmed_contract(&self, contract: &SignedContract) -> Result<(), Error> {
1044        log_debug!(
1045            self.logger,
1046            "Checking confirmed contract. contract_id={}",
1047            contract.accepted_contract.get_contract_id_string()
1048        );
1049        let closable_contract_info = self.get_closable_contract_info(contract).await;
1050        if let Some((contract_info, adaptor_info, attestations)) = closable_contract_info {
1051            log_debug!(
1052                self.logger,
1053                "Found closable contract info. contract_id={} attestations={}",
1054                contract.accepted_contract.get_contract_id_string(),
1055                attestations.len()
1056            );
1057            let offer = &contract.accepted_contract.offered_contract;
1058            let signer = self.signer_provider.derive_contract_signer(offer.keys_id)?;
1059
1060            //  === WARNING ===
1061            // This code could potentially be problematic. When running refund tests, it would look for a CET
1062            // but the CET would be invalid and refund would not pass. By only updating with a valid CET,
1063            // we then go to update. This way if it fails we can check for refund instead of bailing and getting locked
1064            // funds.
1065            if let Ok(cet) = crate::contract_updater::get_signed_cet(
1066                &self.secp,
1067                contract,
1068                contract_info,
1069                adaptor_info,
1070                &attestations,
1071                &signer,
1072                &self.logger,
1073            ) {
1074                log_info!(
1075                    self.logger,
1076                    "Found valid CET. Closing contract. contract_id={}",
1077                    contract.accepted_contract.get_contract_id_string()
1078                );
1079                match self
1080                    .close_contract(
1081                        contract,
1082                        cet,
1083                        attestations.iter().map(|x| x.1.clone()).collect(),
1084                    )
1085                    .await
1086                {
1087                    Ok(closed_contract) => {
1088                        log_info!(
1089                            self.logger,
1090                            "Updated contract to closed. contract_id={}",
1091                            contract.accepted_contract.get_contract_id_string()
1092                        );
1093                        self.store.update_contract(&closed_contract).await?;
1094                        return Ok(());
1095                    }
1096                    Err(e) => {
1097                        log_warn!(
1098                            self.logger,
1099                            "Failed to close contract. contract_id={} error={}",
1100                            contract.accepted_contract.get_contract_id_string(),
1101                            e.to_string()
1102                        );
1103                        return Err(e);
1104                    }
1105                }
1106            }
1107        }
1108
1109        // Check each pending close transaction
1110        for pending_close_tx in &contract
1111            .accepted_contract
1112            .dlc_transactions
1113            .pending_close_txs
1114        {
1115            let confirmations = self
1116                .blockchain
1117                .get_transaction_confirmations(&pending_close_tx.compute_txid())
1118                .await?;
1119
1120            log_debug!(
1121                self.logger,
1122                "Checking pending close transaction. close_txid={} contract_id={} confirmations={}",
1123                pending_close_tx.compute_txid().to_string(),
1124                contract.accepted_contract.get_contract_id_string(),
1125                confirmations
1126            );
1127
1128            if confirmations >= *NB_CONFIRMATIONS {
1129                // Found a fully confirmed pending close - move directly to Closed
1130                log_info!(self.logger,
1131                    "Pending close transaction is fully confirmed. Moving to closed. close_txid={} contract_id={}", 
1132                    pending_close_tx.compute_txid().to_string(),
1133                    contract.accepted_contract.get_contract_id_string()
1134                );
1135                let pnl = contract.accepted_contract.compute_pnl(pending_close_tx);
1136                let closed_contract = ClosedContract {
1137                    attestations: None, // Cooperative close has no attestations
1138                    signed_cet: Some(pending_close_tx.clone()), // Cooperative close doesn't use a CET
1139                    contract_id: contract.accepted_contract.get_contract_id(),
1140                    temporary_contract_id: contract.accepted_contract.offered_contract.id,
1141                    counter_party_id: contract.accepted_contract.offered_contract.counter_party,
1142                    pnl,
1143                    funding_txid: contract
1144                        .accepted_contract
1145                        .dlc_transactions
1146                        .fund
1147                        .compute_txid(),
1148                    signed_contract: contract.clone(),
1149                };
1150
1151                self.store
1152                    .update_contract(&Contract::Closed(closed_contract))
1153                    .await?;
1154                break; // Only one close can be confirmed
1155            } else if confirmations >= 1 {
1156                // Found a confirmed but not fully confirmed pending close - move to PreClosed
1157                log_debug!(self.logger,
1158                    "Found a confirmed but not fully confirmed pending close. Moving to preclosed. close_txid={} contract_id={}", 
1159                    pending_close_tx.compute_txid().to_string(),
1160                    contract.accepted_contract.get_contract_id_string()
1161                );
1162                let preclosed_contract = PreClosedContract {
1163                    signed_contract: contract.clone(),
1164                    attestations: None, // Cooperative close has no attestations
1165                    signed_cet: pending_close_tx.clone(),
1166                };
1167
1168                self.store
1169                    .update_contract(&Contract::PreClosed(preclosed_contract))
1170                    .await?;
1171                break; // Only one close can be confirmed
1172            }
1173        }
1174
1175        self.check_refund(contract).await?;
1176
1177        Ok(())
1178    }
1179
1180    /// Manually close a contract with the oracle attestations.
1181    #[tracing::instrument(skip_all, level = "debug")]
1182    pub async fn close_confirmed_contract(
1183        &self,
1184        contract_id: &ContractId,
1185        attestations: Vec<(usize, OracleAttestation)>,
1186    ) -> Result<Contract, Error> {
1187        log_info!(
1188            self.logger,
1189            "Attempting to close confirmed contract manually. contract_id={} outcomes={:?}",
1190            contract_id.to_lower_hex_string(),
1191            attestations
1192                .iter()
1193                .map(|(_, a)| a.outcomes.clone())
1194                .collect::<Vec<_>>()
1195        );
1196        let contract = get_contract_in_state!(self, contract_id, Confirmed, None::<PublicKey>)?;
1197        let contract_infos = &contract.accepted_contract.offered_contract.contract_info;
1198        let adaptor_infos = &contract.accepted_contract.adaptor_infos;
1199
1200        // find the contract info that matches the attestations
1201        if let Some((contract_info, adaptor_info)) =
1202            contract_infos.iter().zip(adaptor_infos).find(|(c, _)| {
1203                let matches = attestations
1204                    .iter()
1205                    .filter(|(i, a)| {
1206                        c.oracle_announcements[*i].oracle_event.oracle_nonces == a.nonces()
1207                    })
1208                    .count();
1209
1210                matches >= c.threshold
1211            })
1212        {
1213            let offer = &contract.accepted_contract.offered_contract;
1214            let signer = self.signer_provider.derive_contract_signer(offer.keys_id)?;
1215            log_debug!(
1216                self.logger,
1217                "Getting signed CET. contract_id={}",
1218                contract.accepted_contract.get_contract_id_string()
1219            );
1220            let cet = crate::contract_updater::get_signed_cet(
1221                &self.secp,
1222                &contract,
1223                contract_info,
1224                adaptor_info,
1225                &attestations,
1226                &signer,
1227                &self.logger,
1228            )?;
1229
1230            // Check that the lock time has passed
1231            let time = bitcoin::absolute::Time::from_consensus(self.time.unix_time_now() as u32)
1232                .expect("Time is not in valid range. This should never happen.");
1233            let height =
1234                Height::from_consensus(self.blockchain.get_blockchain_height().await? as u32)
1235                    .expect("Height is not in valid range. This should never happen.");
1236            let locktime = cet.lock_time;
1237
1238            if !locktime.is_satisfied_by(height, time) {
1239                return Err(Error::InvalidState(
1240                    "CET lock time has not passed yet".to_string(),
1241                ));
1242            }
1243
1244            match self
1245                .close_contract(
1246                    &contract,
1247                    cet,
1248                    attestations.into_iter().map(|x| x.1).collect(),
1249                )
1250                .await
1251            {
1252                Ok(closed_contract) => {
1253                    log_info!(
1254                        self.logger,
1255                        "Closed contract manually. contract_id={}",
1256                        contract.accepted_contract.get_contract_id_string()
1257                    );
1258                    self.store.update_contract(&closed_contract).await?;
1259                    Ok(closed_contract)
1260                }
1261                Err(e) => {
1262                    log_error!(
1263                        self.logger,
1264                        "Failed to close contract. contract_id={} error={}",
1265                        contract.accepted_contract.get_contract_id_string(),
1266                        e.to_string()
1267                    );
1268                    Err(e)
1269                }
1270            }
1271        } else {
1272            Err(Error::InvalidState(
1273                "Attestations did not match contract infos".to_string(),
1274            ))
1275        }
1276    }
1277
1278    #[tracing::instrument(skip_all, level = "debug")]
1279    async fn check_preclosed_contracts(&self) -> Result<(), Error> {
1280        for c in self.store.get_preclosed_contracts().await? {
1281            if let Err(e) = self.check_preclosed_contract(&c).await {
1282                log_error!(
1283                    self.logger,
1284                    "Error checking pre-closed contract. contract_id={} error={}",
1285                    c.signed_contract.accepted_contract.get_contract_id_string(),
1286                    e
1287                )
1288            }
1289        }
1290
1291        Ok(())
1292    }
1293
1294    #[tracing::instrument(skip_all, level = "debug")]
1295    async fn check_preclosed_contract(&self, contract: &PreClosedContract) -> Result<(), Error> {
1296        let broadcasted_txid = contract.signed_cet.compute_txid();
1297        let confirmations = self
1298            .blockchain
1299            .get_transaction_confirmations(&broadcasted_txid)
1300            .await?;
1301        log_debug!(
1302            self.logger,
1303            "Checking pre-closed contract. broadcasted_txid={} contract_id={} confirmations={}",
1304            broadcasted_txid.to_string(),
1305            contract
1306                .signed_contract
1307                .accepted_contract
1308                .get_contract_id_string(),
1309            confirmations
1310        );
1311        if confirmations >= *NB_CONFIRMATIONS {
1312            log_debug!(self.logger,
1313                "Pre-closed contract is fully confirmed. Moving to closed. broadcasted_txid={} contract_id={}", 
1314                broadcasted_txid.to_string(),
1315                contract.signed_contract.accepted_contract.get_contract_id_string()
1316            );
1317            let pnl = contract
1318                .signed_contract
1319                .accepted_contract
1320                .compute_pnl(&contract.signed_cet);
1321
1322            let closed_contract = ClosedContract {
1323                attestations: contract.attestations.clone(),
1324                signed_cet: Some(contract.signed_cet.clone()),
1325                contract_id: contract.signed_contract.accepted_contract.get_contract_id(),
1326                temporary_contract_id: contract
1327                    .signed_contract
1328                    .accepted_contract
1329                    .offered_contract
1330                    .id,
1331                counter_party_id: contract
1332                    .signed_contract
1333                    .accepted_contract
1334                    .offered_contract
1335                    .counter_party,
1336                funding_txid: contract
1337                    .signed_contract
1338                    .accepted_contract
1339                    .dlc_transactions
1340                    .fund
1341                    .compute_txid(),
1342                pnl,
1343                signed_contract: contract.signed_contract.clone(),
1344            };
1345            self.store
1346                .update_contract(&Contract::Closed(closed_contract))
1347                .await?;
1348        }
1349
1350        Ok(())
1351    }
1352
1353    #[tracing::instrument(skip_all, level = "debug")]
1354    async fn close_contract(
1355        &self,
1356        contract: &SignedContract,
1357        signed_cet: Transaction,
1358        attestations: Vec<OracleAttestation>,
1359    ) -> Result<Contract, Error> {
1360        let confirmations = self
1361            .blockchain
1362            .get_transaction_confirmations(&signed_cet.compute_txid())
1363            .await?;
1364
1365        if confirmations < 1 {
1366            log_info!(
1367                self.logger,
1368                "Broadcasting signed CET. txid={} contract_id={}",
1369                signed_cet.compute_txid().to_string(),
1370                contract.accepted_contract.get_contract_id_string()
1371            );
1372            // TODO(tibo): if this fails because another tx is already in
1373            // mempool or blockchain, we might have been cheated. There is
1374            // not much to be done apart from possibly extracting a fraud
1375            // proof but ideally it should be handled.
1376            self.blockchain.send_transaction(&signed_cet).await?;
1377
1378            let preclosed_contract = PreClosedContract {
1379                signed_contract: contract.clone(),
1380                attestations: Some(attestations),
1381                signed_cet,
1382            };
1383
1384            return Ok(Contract::PreClosed(preclosed_contract));
1385        } else if confirmations < *NB_CONFIRMATIONS {
1386            log_debug!(self.logger,
1387                "Found a confirmed but not fully confirmed pending close. Moving to preclosed. txid={} contract_id={}", 
1388                signed_cet.compute_txid().to_string(),
1389                contract.accepted_contract.get_contract_id_string()
1390            );
1391            let preclosed_contract = PreClosedContract {
1392                signed_contract: contract.clone(),
1393                attestations: Some(attestations),
1394                signed_cet,
1395            };
1396
1397            return Ok(Contract::PreClosed(preclosed_contract));
1398        }
1399
1400        let closed_contract = ClosedContract {
1401            attestations: Some(attestations.to_vec()),
1402            pnl: contract.accepted_contract.compute_pnl(&signed_cet),
1403            signed_cet: Some(signed_cet),
1404            contract_id: contract.accepted_contract.get_contract_id(),
1405            temporary_contract_id: contract.accepted_contract.offered_contract.id,
1406            funding_txid: contract
1407                .accepted_contract
1408                .dlc_transactions
1409                .fund
1410                .compute_txid(),
1411            counter_party_id: contract.accepted_contract.offered_contract.counter_party,
1412            signed_contract: contract.clone(),
1413        };
1414
1415        Ok(Contract::Closed(closed_contract))
1416    }
1417
1418    // TODO: Make this public to refund
1419    #[tracing::instrument(skip_all, level = "debug")]
1420    async fn check_refund(&self, contract: &SignedContract) -> Result<(), Error> {
1421        // TODO(tibo): should check for confirmation of refund before updating state
1422        if contract
1423            .accepted_contract
1424            .dlc_transactions
1425            .refund
1426            .lock_time
1427            .to_consensus_u32() as u64
1428            <= self.time.unix_time_now()
1429        {
1430            log_debug!(self.logger,
1431                "Refund locktime has passed. Attempting to broadcast refund. refund_txid={} contract_id={}", 
1432                contract.accepted_contract.dlc_transactions.refund.compute_txid().to_string(),
1433                contract.accepted_contract.get_contract_id_string()
1434            );
1435            let accepted_contract = &contract.accepted_contract;
1436            let refund = accepted_contract.dlc_transactions.refund.clone();
1437            let confirmations = self
1438                .blockchain
1439                .get_transaction_confirmations(&refund.compute_txid())
1440                .await?;
1441            if confirmations == 0 {
1442                log_debug!(self.logger,
1443                    "Refund transaction has not been broadcast yet. Sending transaction txid={} contract_id={}", 
1444                    refund.compute_txid().to_string(),
1445                    contract.accepted_contract.get_contract_id_string()
1446                );
1447                let offer = &contract.accepted_contract.offered_contract;
1448                let signer = self.signer_provider.derive_contract_signer(offer.keys_id)?;
1449                let refund = crate::contract_updater::get_signed_refund(
1450                    &self.secp,
1451                    contract,
1452                    &signer,
1453                    &self.logger,
1454                )?;
1455                self.blockchain.send_transaction(&refund).await?;
1456            }
1457
1458            self.store
1459                .update_contract(&Contract::Refunded(contract.clone()))
1460                .await?;
1461        }
1462
1463        Ok(())
1464    }
1465
1466    /// Function to call when we detect that a contract was closed by our counter party.
1467    /// This will update the state of the contract and return the [`Contract`] object.
1468    #[tracing::instrument(skip_all, level = "debug")]
1469    pub async fn on_counterparty_close(
1470        &mut self,
1471        contract: &SignedContract,
1472        closing_tx: Transaction,
1473        confirmations: u32,
1474    ) -> Result<Contract, Error> {
1475        // check if the closing tx actually spends the funding output
1476        if !closing_tx.input.iter().any(|i| {
1477            i.previous_output
1478                == contract
1479                    .accepted_contract
1480                    .dlc_transactions
1481                    .get_fund_outpoint()
1482        }) {
1483            log_error!(
1484                self.logger,
1485                "Closing tx does not spend the funding tx. txid={} contract_id={}",
1486                closing_tx.compute_txid().to_string(),
1487                contract.accepted_contract.get_contract_id_string()
1488            );
1489            return Err(Error::InvalidParameters(
1490                "Closing tx does not spend the funding tx".to_string(),
1491            ));
1492        }
1493
1494        // check if it is the refund tx (easy case)
1495        if contract
1496            .accepted_contract
1497            .dlc_transactions
1498            .refund
1499            .compute_txid()
1500            == closing_tx.compute_txid()
1501        {
1502            log_debug!(
1503                self.logger,
1504                "Closing tx is the refund tx. Moving to refunded. txid={} contract_id={}",
1505                closing_tx.compute_txid().to_string(),
1506                contract.accepted_contract.get_contract_id_string()
1507            );
1508            let refunded = Contract::Refunded(contract.clone());
1509            self.store.update_contract(&refunded).await?;
1510            return Ok(refunded);
1511        }
1512
1513        let contract = if confirmations < *NB_CONFIRMATIONS {
1514            log_info!(self.logger,
1515                "Closing transaction is not fully confirmed. Moving to preclosed. txid={} contract_id={}", 
1516                closing_tx.compute_txid().to_string(),
1517                contract.accepted_contract.get_contract_id_string()
1518            );
1519            Contract::PreClosed(PreClosedContract {
1520                signed_contract: contract.clone(),
1521                attestations: None, // todo in some cases we can get the attestations from the closing tx
1522                signed_cet: closing_tx,
1523            })
1524        } else {
1525            log_info!(
1526                self.logger,
1527                "Closing transaction is fully confirmed. Moving to closed. txid={} contract_id={}",
1528                closing_tx.compute_txid().to_string(),
1529                contract.accepted_contract.get_contract_id_string()
1530            );
1531            Contract::Closed(ClosedContract {
1532                attestations: None, // todo in some cases we can get the attestations from the closing tx
1533                pnl: contract.accepted_contract.compute_pnl(&closing_tx),
1534                signed_cet: Some(closing_tx),
1535                contract_id: contract.accepted_contract.get_contract_id(),
1536                temporary_contract_id: contract.accepted_contract.offered_contract.id,
1537                counter_party_id: contract.accepted_contract.offered_contract.counter_party,
1538                funding_txid: contract
1539                    .accepted_contract
1540                    .dlc_transactions
1541                    .fund
1542                    .compute_txid(),
1543                signed_contract: contract.clone(),
1544            })
1545        };
1546
1547        self.store.update_contract(&contract).await?;
1548
1549        Ok(contract)
1550    }
1551
1552    /// Initiates a cooperative close of a contract by creating and signing a closing transaction.
1553    /// Returns a CloseDlc message to be sent to the counter party.
1554    /// The contract remains in Confirmed state until the close transaction is broadcast.
1555    #[tracing::instrument(skip_all, level = "debug")]
1556    pub async fn cooperative_close_contract(
1557        &self,
1558        contract_id: &ContractId,
1559        counter_payout: Amount,
1560    ) -> Result<(CloseDlc, PublicKey), Error> {
1561        let signed_contract =
1562            get_contract_in_state!(self, contract_id, Confirmed, None as Option<PublicKey>)?;
1563
1564        let (close_message, close_tx) = crate::contract_updater::create_cooperative_close(
1565            &self.secp,
1566            &signed_contract,
1567            counter_payout,
1568            &self.signer_provider,
1569            &self.logger,
1570        )?;
1571
1572        // Create updated contract with pending close transaction
1573        let mut updated_dlc_transactions =
1574            signed_contract.accepted_contract.dlc_transactions.clone();
1575        updated_dlc_transactions
1576            .pending_close_txs
1577            .push(close_tx.clone());
1578        log_debug!(
1579            self.logger,
1580            "Created updated contract with pending close transaction. close_txid={} contract_id={}",
1581            close_tx.compute_txid().to_string(),
1582            contract_id.to_lower_hex_string()
1583        );
1584
1585        let updated_accepted_contract = AcceptedContract {
1586            dlc_transactions: updated_dlc_transactions,
1587            ..signed_contract.accepted_contract.clone()
1588        };
1589
1590        let updated_signed_contract = SignedContract {
1591            accepted_contract: updated_accepted_contract,
1592            ..signed_contract.clone()
1593        };
1594
1595        // Update contract state to track pending close
1596        self.store
1597            .update_contract(&Contract::Confirmed(updated_signed_contract))
1598            .await?;
1599
1600        let counter_party = signed_contract
1601            .accepted_contract
1602            .offered_contract
1603            .counter_party;
1604
1605        Ok((close_message, counter_party))
1606    }
1607
1608    /// Accepts a cooperative close request by completing the close transaction
1609    /// and broadcasting it to the network.
1610    #[tracing::instrument(skip_all, level = "debug")]
1611    pub async fn accept_cooperative_close(
1612        &self,
1613        contract_id: &ContractId,
1614        close_message: &CloseDlc,
1615    ) -> Result<(), Error> {
1616        let signed_contract =
1617            get_contract_in_state!(self, contract_id, Confirmed, None as Option<PublicKey>)?;
1618
1619        let close_tx = crate::contract_updater::complete_cooperative_close(
1620            &self.secp,
1621            &signed_contract,
1622            close_message,
1623            &self.signer_provider,
1624            &self.logger,
1625        )?;
1626
1627        log_debug!(
1628            self.logger,
1629            "Completed cooperative close. close_txid={} contract_id={}",
1630            close_tx.compute_txid().to_string(),
1631            contract_id.to_lower_hex_string()
1632        );
1633        // Broadcast the closing transaction
1634        self.blockchain.send_transaction(&close_tx).await?;
1635
1636        // Create PreClosed contract (transaction broadcast but not confirmed yet)
1637        let preclosed_contract = PreClosedContract {
1638            signed_contract,
1639            attestations: None,
1640            signed_cet: close_tx,
1641        };
1642
1643        self.store
1644            .update_contract(&Contract::PreClosed(preclosed_contract))
1645            .await?;
1646
1647        Ok(())
1648    }
1649}
1650
1651impl<
1652        W: Deref,
1653        SP: Deref,
1654        B: Deref,
1655        S: Deref,
1656        O: Deref,
1657        T: Deref,
1658        F: Deref,
1659        X: ContractSigner,
1660        L: Deref,
1661    > Manager<W, Arc<CachedContractSignerProvider<SP, X>>, B, S, O, T, F, X, L>
1662where
1663    W::Target: Wallet,
1664    SP::Target: ContractSignerProvider<Signer = X>,
1665    B::Target: Blockchain,
1666    S::Target: Storage,
1667    O::Target: Oracle,
1668    T::Target: Time,
1669    F::Target: FeeEstimator,
1670    L::Target: Logger,
1671{
1672    /// Create a new channel offer and return the [`dlc_messages::channel::OfferChannel`]
1673    /// message to be sent to the `counter_party`.
1674    pub async fn offer_channel(
1675        &self,
1676        contract_input: &ContractInput,
1677        counter_party: PublicKey,
1678    ) -> Result<OfferChannel, Error> {
1679        let oracle_announcements = self.oracle_announcements(contract_input).await?;
1680
1681        let (offered_channel, offered_contract) = crate::channel_updater::offer_channel(
1682            &self.secp,
1683            contract_input,
1684            &counter_party,
1685            &oracle_announcements,
1686            CET_NSEQUENCE,
1687            REFUND_DELAY,
1688            &self.wallet,
1689            &self.signer_provider,
1690            &self.blockchain,
1691            &self.time,
1692            crate::utils::get_new_temporary_id(),
1693        )
1694        .await?;
1695
1696        let msg = offered_channel.get_offer_channel_msg(&offered_contract);
1697
1698        self.store
1699            .upsert_channel(
1700                Channel::Offered(offered_channel),
1701                Some(Contract::Offered(offered_contract)),
1702            )
1703            .await?;
1704
1705        Ok(msg)
1706    }
1707
1708    /// Reject a channel that was offered. Returns the [`dlc_messages::channel::Reject`]
1709    /// message to be sent as well as the public key of the offering node.
1710    pub async fn reject_channel(
1711        &self,
1712        channel_id: &ChannelId,
1713    ) -> Result<(Reject, PublicKey), Error> {
1714        let offered_channel =
1715            get_channel_in_state!(self, channel_id, Offered, None as Option<PublicKey>)?;
1716
1717        if offered_channel.is_offer_party {
1718            return Err(Error::InvalidState(
1719                "Cannot reject channel initiated by us.".to_string(),
1720            ));
1721        }
1722
1723        let offered_contract = get_contract_in_state!(
1724            self,
1725            &offered_channel.offered_contract_id,
1726            Offered,
1727            None as Option<PublicKey>
1728        )?;
1729
1730        let counterparty = offered_channel.counter_party;
1731        self.store
1732            .upsert_channel(
1733                Channel::Cancelled(offered_channel),
1734                Some(Contract::Rejected(offered_contract)),
1735            )
1736            .await?;
1737
1738        let msg = Reject {
1739            channel_id: *channel_id,
1740        };
1741        Ok((msg, counterparty))
1742    }
1743
1744    /// Accept a channel that was offered. Returns the [`dlc_messages::channel::AcceptChannel`]
1745    /// message to be sent, the updated [`crate::ChannelId`] and [`crate::ContractId`],
1746    /// as well as the public key of the offering node.
1747    pub async fn accept_channel(
1748        &self,
1749        channel_id: &ChannelId,
1750    ) -> Result<(AcceptChannel, ChannelId, ContractId, PublicKey), Error> {
1751        let offered_channel =
1752            get_channel_in_state!(self, channel_id, Offered, None as Option<PublicKey>)?;
1753
1754        if offered_channel.is_offer_party {
1755            return Err(Error::InvalidState(
1756                "Cannot accept channel initiated by us.".to_string(),
1757            ));
1758        }
1759
1760        let offered_contract = get_contract_in_state!(
1761            self,
1762            &offered_channel.offered_contract_id,
1763            Offered,
1764            None as Option<PublicKey>
1765        )?;
1766
1767        let (accepted_channel, accepted_contract, accept_channel) =
1768            crate::channel_updater::accept_channel_offer(
1769                &self.secp,
1770                &offered_channel,
1771                &offered_contract,
1772                &self.wallet,
1773                &self.signer_provider,
1774                &self.blockchain,
1775            )
1776            .await?;
1777
1778        self.wallet.import_address(&Address::p2wsh(
1779            &accepted_contract.dlc_transactions.funding_script_pubkey,
1780            self.blockchain.get_network()?,
1781        ))?;
1782
1783        let channel_id = accepted_channel.channel_id;
1784        let contract_id = accepted_contract.get_contract_id();
1785        let counter_party = accepted_contract.offered_contract.counter_party;
1786
1787        self.store
1788            .upsert_channel(
1789                Channel::Accepted(accepted_channel),
1790                Some(Contract::Accepted(accepted_contract)),
1791            )
1792            .await?;
1793
1794        Ok((accept_channel, channel_id, contract_id, counter_party))
1795    }
1796
1797    /// Force close the channel with given [`crate::ChannelId`].
1798    pub async fn force_close_channel(&self, channel_id: &ChannelId) -> Result<(), Error> {
1799        let channel = get_channel_in_state!(self, channel_id, Signed, None as Option<PublicKey>)?;
1800
1801        self.force_close_channel_internal(channel, true).await
1802    }
1803
1804    /// Offer to settle the balance of a channel so that the counter party gets
1805    /// `counter_payout`. Returns the [`dlc_messages::channel::SettleChannelOffer`]
1806    /// message to be sent and the public key of the counter party node.
1807    pub async fn settle_offer(
1808        &self,
1809        channel_id: &ChannelId,
1810        counter_payout: Amount,
1811    ) -> Result<(SettleOffer, PublicKey), Error> {
1812        let mut signed_channel =
1813            get_channel_in_state!(self, channel_id, Signed, None as Option<PublicKey>)?;
1814
1815        let msg = crate::channel_updater::settle_channel_offer(
1816            &self.secp,
1817            &mut signed_channel,
1818            counter_payout,
1819            PEER_TIMEOUT,
1820            &self.signer_provider,
1821            &self.time,
1822        )?;
1823
1824        let counter_party = signed_channel.counter_party;
1825
1826        self.store
1827            .upsert_channel(Channel::Signed(signed_channel), None)
1828            .await?;
1829
1830        Ok((msg, counter_party))
1831    }
1832
1833    /// Accept a settlement offer, returning the [`SettleAccept`] message to be
1834    /// sent to the node with the returned [`PublicKey`] id.
1835    pub async fn accept_settle_offer(
1836        &self,
1837        channel_id: &ChannelId,
1838    ) -> Result<(SettleAccept, PublicKey), Error> {
1839        let mut signed_channel =
1840            get_channel_in_state!(self, channel_id, Signed, None as Option<PublicKey>)?;
1841
1842        let msg = crate::channel_updater::settle_channel_accept(
1843            &self.secp,
1844            &mut signed_channel,
1845            CET_NSEQUENCE,
1846            0,
1847            PEER_TIMEOUT,
1848            &self.signer_provider,
1849            &self.time,
1850            &self.chain_monitor,
1851        )
1852        .await?;
1853
1854        let counter_party = signed_channel.counter_party;
1855
1856        self.store
1857            .upsert_channel(Channel::Signed(signed_channel), None)
1858            .await?;
1859
1860        Ok((msg, counter_party))
1861    }
1862
1863    /// Returns a [`RenewOffer`] message as well as the [`PublicKey`] of the
1864    /// counter party's node to offer the establishment of a new contract in the
1865    /// channel.
1866    pub async fn renew_offer(
1867        &self,
1868        channel_id: &ChannelId,
1869        counter_payout: Amount,
1870        contract_input: &ContractInput,
1871    ) -> Result<(RenewOffer, PublicKey), Error> {
1872        let mut signed_channel =
1873            get_channel_in_state!(self, channel_id, Signed, None as Option<PublicKey>)?;
1874
1875        let oracle_announcements = self.oracle_announcements(contract_input).await?;
1876
1877        let (msg, offered_contract) = crate::channel_updater::renew_offer(
1878            &self.secp,
1879            &mut signed_channel,
1880            contract_input,
1881            oracle_announcements,
1882            counter_payout,
1883            REFUND_DELAY,
1884            PEER_TIMEOUT,
1885            CET_NSEQUENCE,
1886            &self.signer_provider,
1887            &self.time,
1888        )?;
1889
1890        let counter_party = offered_contract.counter_party;
1891
1892        self.store
1893            .upsert_channel(
1894                Channel::Signed(signed_channel),
1895                Some(Contract::Offered(offered_contract)),
1896            )
1897            .await?;
1898
1899        Ok((msg, counter_party))
1900    }
1901
1902    /// Accept an offer to renew the contract in the channel. Returns the
1903    /// [`RenewAccept`] message to be sent to the peer with the returned
1904    /// [`PublicKey`] as node id.
1905    pub async fn accept_renew_offer(
1906        &self,
1907        channel_id: &ChannelId,
1908    ) -> Result<(RenewAccept, PublicKey), Error> {
1909        let mut signed_channel =
1910            get_channel_in_state!(self, channel_id, Signed, None as Option<PublicKey>)?;
1911        let offered_contract_id = signed_channel.get_contract_id().ok_or_else(|| {
1912            Error::InvalidState("Expected to have a contract id but did not.".to_string())
1913        })?;
1914
1915        let offered_contract = get_contract_in_state!(
1916            self,
1917            &offered_contract_id,
1918            Offered,
1919            None as Option<PublicKey>
1920        )?;
1921
1922        let (accepted_contract, msg) = crate::channel_updater::accept_channel_renewal(
1923            &self.secp,
1924            &mut signed_channel,
1925            &offered_contract,
1926            CET_NSEQUENCE,
1927            PEER_TIMEOUT,
1928            &self.signer_provider,
1929            &self.time,
1930        )?;
1931
1932        let counter_party = signed_channel.counter_party;
1933
1934        self.store
1935            .upsert_channel(
1936                Channel::Signed(signed_channel),
1937                Some(Contract::Accepted(accepted_contract)),
1938            )
1939            .await?;
1940
1941        Ok((msg, counter_party))
1942    }
1943
1944    /// Reject an offer to renew the contract in the channel. Returns the
1945    /// [`Reject`] message to be sent to the peer with the returned
1946    /// [`PublicKey`] node id.
1947    pub async fn reject_renew_offer(
1948        &self,
1949        channel_id: &ChannelId,
1950    ) -> Result<(Reject, PublicKey), Error> {
1951        let mut signed_channel =
1952            get_channel_in_state!(self, channel_id, Signed, None as Option<PublicKey>)?;
1953        let offered_contract_id = signed_channel.get_contract_id().ok_or_else(|| {
1954            Error::InvalidState(
1955                "Expected to be in a state with an associated contract id but was not.".to_string(),
1956            )
1957        })?;
1958
1959        let offered_contract = get_contract_in_state!(
1960            self,
1961            &offered_contract_id,
1962            Offered,
1963            None as Option<PublicKey>
1964        )?;
1965
1966        let reject_msg = crate::channel_updater::reject_renew_offer(&mut signed_channel)?;
1967
1968        let counter_party = signed_channel.counter_party;
1969
1970        self.store
1971            .upsert_channel(
1972                Channel::Signed(signed_channel),
1973                Some(Contract::Rejected(offered_contract)),
1974            )
1975            .await?;
1976
1977        Ok((reject_msg, counter_party))
1978    }
1979
1980    /// Returns a [`Reject`] message to be sent to the counter party of the
1981    /// channel to inform them that the local party does not wish to accept the
1982    /// proposed settle offer.
1983    pub async fn reject_settle_offer(
1984        &self,
1985        channel_id: &ChannelId,
1986    ) -> Result<(Reject, PublicKey), Error> {
1987        let mut signed_channel =
1988            get_channel_in_state!(self, channel_id, Signed, None as Option<PublicKey>)?;
1989
1990        let msg = crate::channel_updater::reject_settle_offer(&mut signed_channel)?;
1991
1992        let counter_party = signed_channel.counter_party;
1993
1994        self.store
1995            .upsert_channel(Channel::Signed(signed_channel), None)
1996            .await?;
1997
1998        Ok((msg, counter_party))
1999    }
2000
2001    /// Returns a [`CollaborativeCloseOffer`] message to be sent to the counter
2002    /// party of the channel and update the state of the channel. Note that the
2003    /// channel will be forced closed after a timeout if the counter party does
2004    /// not broadcast the close transaction.
2005    pub async fn offer_collaborative_close(
2006        &self,
2007        channel_id: &ChannelId,
2008        counter_payout: Amount,
2009        additional_inputs: Vec<OutPoint>,
2010    ) -> Result<CollaborativeCloseOffer, Error> {
2011        let mut signed_channel =
2012            get_channel_in_state!(self, channel_id, Signed, None as Option<PublicKey>)?;
2013
2014        let (msg, close_tx) = crate::channel_updater::offer_collaborative_close(
2015            &self.secp,
2016            &mut signed_channel,
2017            counter_payout,
2018            additional_inputs,
2019            &self.signer_provider,
2020            &self.time,
2021        )?;
2022
2023        self.chain_monitor.lock().await.add_tx(
2024            close_tx.compute_txid(),
2025            ChannelInfo {
2026                channel_id: *channel_id,
2027                tx_type: TxType::CollaborativeClose,
2028            },
2029        );
2030
2031        self.store
2032            .upsert_channel(Channel::Signed(signed_channel), None)
2033            .await?;
2034        self.store
2035            .persist_chain_monitor(&*self.chain_monitor.lock().await)
2036            .await?;
2037
2038        Ok(msg)
2039    }
2040
2041    /// Accept an offer to collaboratively close the channel. The close transaction
2042    /// will be broadcast and the state of the channel updated.
2043    pub async fn accept_collaborative_close(&self, channel_id: &ChannelId) -> Result<(), Error> {
2044        let signed_channel =
2045            get_channel_in_state!(self, channel_id, Signed, None as Option<PublicKey>)?;
2046
2047        let closed_contract = if let Some(SignedChannelState::Established {
2048            signed_contract_id,
2049            ..
2050        }) = &signed_channel.roll_back_state
2051        {
2052            let counter_payout = get_signed_channel_state!(
2053                signed_channel,
2054                CollaborativeCloseOffered,
2055                counter_payout
2056            )?;
2057            Some(
2058                self.get_collaboratively_closed_contract(signed_contract_id, *counter_payout, true)
2059                    .await?,
2060            )
2061        } else {
2062            None
2063        };
2064
2065        let (close_tx, closed_channel) = crate::channel_updater::accept_collaborative_close_offer(
2066            &self.secp,
2067            &signed_channel,
2068            &self.signer_provider,
2069        )?;
2070
2071        self.blockchain.send_transaction(&close_tx).await?;
2072
2073        self.store.upsert_channel(closed_channel, None).await?;
2074
2075        if let Some(closed_contract) = closed_contract {
2076            self.store
2077                .update_contract(&Contract::Closed(closed_contract))
2078                .await?;
2079        }
2080
2081        Ok(())
2082    }
2083
2084    async fn try_finalize_closing_established_channel(
2085        &self,
2086        signed_channel: SignedChannel,
2087    ) -> Result<(), Error> {
2088        let (buffer_tx, contract_id, &is_initiator) = get_signed_channel_state!(
2089            signed_channel,
2090            Closing,
2091            buffer_transaction,
2092            contract_id,
2093            is_initiator
2094        )?;
2095
2096        if self
2097            .blockchain
2098            .get_transaction_confirmations(&buffer_tx.compute_txid())
2099            .await?
2100            >= CET_NSEQUENCE
2101        {
2102            log_info!(
2103                self.logger,
2104                "Buffer transaction for contract {} has enough confirmations to spend from it",
2105                serialize_hex(&contract_id)
2106            );
2107
2108            let confirmed_contract =
2109                get_contract_in_state!(self, &contract_id, Confirmed, None as Option<PublicKey>)?;
2110
2111            let (contract_info, adaptor_info, attestations) = self
2112                .get_closable_contract_info(&confirmed_contract)
2113                .await
2114                .ok_or_else(|| {
2115                    Error::InvalidState("Could not get information to close contract".to_string())
2116                })?;
2117
2118            let (signed_cet, closed_channel) =
2119                crate::channel_updater::finalize_unilateral_close_settled_channel(
2120                    &self.secp,
2121                    &signed_channel,
2122                    &confirmed_contract,
2123                    contract_info,
2124                    &attestations,
2125                    adaptor_info,
2126                    &self.signer_provider,
2127                    is_initiator,
2128                )?;
2129
2130            let closed_contract = self
2131                .close_contract(
2132                    &confirmed_contract,
2133                    signed_cet,
2134                    attestations.iter().map(|x| &x.1).cloned().collect(),
2135                )
2136                .await?;
2137
2138            self.chain_monitor
2139                .lock()
2140                .await
2141                .cleanup_channel(signed_channel.channel_id);
2142
2143            self.store
2144                .upsert_channel(closed_channel, Some(closed_contract))
2145                .await?;
2146        }
2147
2148        Ok(())
2149    }
2150
2151    async fn on_offer_channel(
2152        &self,
2153        offer_channel: &OfferChannel,
2154        counter_party: PublicKey,
2155    ) -> Result<(), Error> {
2156        offer_channel.validate(
2157            &self.secp,
2158            REFUND_DELAY,
2159            REFUND_DELAY * 2,
2160            CET_NSEQUENCE,
2161            CET_NSEQUENCE * 2,
2162        )?;
2163
2164        let keys_id = self
2165            .signer_provider
2166            .derive_signer_key_id(false, offer_channel.temporary_contract_id);
2167        let (channel, contract) =
2168            OfferedChannel::from_offer_channel(offer_channel, counter_party, keys_id)?;
2169
2170        contract.validate()?;
2171
2172        if self
2173            .store
2174            .get_channel(&channel.temporary_channel_id)
2175            .await?
2176            .is_some()
2177        {
2178            return Err(Error::InvalidParameters(
2179                "Channel with identical idea already in store".to_string(),
2180            ));
2181        }
2182
2183        self.store
2184            .upsert_channel(Channel::Offered(channel), Some(Contract::Offered(contract)))
2185            .await?;
2186
2187        Ok(())
2188    }
2189
2190    async fn on_accept_channel(
2191        &self,
2192        accept_channel: &AcceptChannel,
2193        peer_id: &PublicKey,
2194    ) -> Result<SignChannel, Error> {
2195        let offered_channel = get_channel_in_state!(
2196            self,
2197            &accept_channel.temporary_channel_id,
2198            Offered,
2199            Some(*peer_id)
2200        )?;
2201        let offered_contract = get_contract_in_state!(
2202            self,
2203            &offered_channel.offered_contract_id,
2204            Offered,
2205            Some(*peer_id)
2206        )?;
2207
2208        let (signed_channel, signed_contract, sign_channel) = {
2209            let res = crate::channel_updater::verify_and_sign_accepted_channel(
2210                &self.secp,
2211                &offered_channel,
2212                &offered_contract,
2213                accept_channel,
2214                //TODO(tibo): this should be parameterizable.
2215                CET_NSEQUENCE,
2216                &self.wallet,
2217                &self.signer_provider,
2218                &self.store,
2219                &self.chain_monitor,
2220                &self.logger,
2221            )
2222            .await;
2223
2224            match res {
2225                Ok(res) => res,
2226                Err(e) => {
2227                    let channel = crate::channel::FailedAccept {
2228                        temporary_channel_id: accept_channel.temporary_channel_id,
2229                        error_message: format!("Error validating accept channel: {e}"),
2230                        accept_message: accept_channel.clone(),
2231                        counter_party: *peer_id,
2232                    };
2233                    self.store
2234                        .upsert_channel(Channel::FailedAccept(channel), None)
2235                        .await?;
2236                    return Err(e);
2237                }
2238            }
2239        };
2240
2241        self.wallet.import_address(&Address::p2wsh(
2242            &signed_contract
2243                .accepted_contract
2244                .dlc_transactions
2245                .funding_script_pubkey,
2246            self.blockchain.get_network()?,
2247        ))?;
2248
2249        if let SignedChannelState::Established {
2250            buffer_transaction, ..
2251        } = &signed_channel.state
2252        {
2253            self.chain_monitor.lock().await.add_tx(
2254                buffer_transaction.compute_txid(),
2255                ChannelInfo {
2256                    channel_id: signed_channel.channel_id,
2257                    tx_type: TxType::BufferTx,
2258                },
2259            );
2260        } else {
2261            unreachable!();
2262        }
2263
2264        self.store
2265            .upsert_channel(
2266                Channel::Signed(signed_channel),
2267                Some(Contract::Signed(signed_contract)),
2268            )
2269            .await?;
2270
2271        self.store
2272            .persist_chain_monitor(&*self.chain_monitor.lock().await)
2273            .await?;
2274
2275        Ok(sign_channel)
2276    }
2277
2278    async fn on_sign_channel(
2279        &self,
2280        sign_channel: &SignChannel,
2281        peer_id: &PublicKey,
2282    ) -> Result<(), Error> {
2283        let accepted_channel =
2284            get_channel_in_state!(self, &sign_channel.channel_id, Accepted, Some(*peer_id))?;
2285        let accepted_contract = get_contract_in_state!(
2286            self,
2287            &accepted_channel.accepted_contract_id,
2288            Accepted,
2289            Some(*peer_id)
2290        )?;
2291
2292        let (signed_channel, signed_contract, signed_fund_tx) = {
2293            let res = verify_signed_channel(
2294                &self.secp,
2295                &accepted_channel,
2296                &accepted_contract,
2297                sign_channel,
2298                &self.wallet,
2299                &self.chain_monitor,
2300                &self.store,
2301                &self.signer_provider,
2302                &self.logger,
2303            )
2304            .await;
2305
2306            match res {
2307                Ok(res) => res,
2308                Err(e) => {
2309                    let channel = crate::channel::FailedSign {
2310                        channel_id: sign_channel.channel_id,
2311                        error_message: format!("Error validating accept channel: {e}"),
2312                        sign_message: sign_channel.clone(),
2313                        counter_party: *peer_id,
2314                    };
2315                    self.store
2316                        .upsert_channel(Channel::FailedSign(channel), None)
2317                        .await?;
2318                    return Err(e);
2319                }
2320            }
2321        };
2322
2323        if let SignedChannelState::Established {
2324            buffer_transaction, ..
2325        } = &signed_channel.state
2326        {
2327            self.chain_monitor.lock().await.add_tx(
2328                buffer_transaction.compute_txid(),
2329                ChannelInfo {
2330                    channel_id: signed_channel.channel_id,
2331                    tx_type: TxType::BufferTx,
2332                },
2333            );
2334        } else {
2335            unreachable!();
2336        }
2337
2338        self.blockchain.send_transaction(&signed_fund_tx).await?;
2339
2340        self.store
2341            .upsert_channel(
2342                Channel::Signed(signed_channel),
2343                Some(Contract::Signed(signed_contract)),
2344            )
2345            .await?;
2346        self.store
2347            .persist_chain_monitor(&*self.chain_monitor.lock().await)
2348            .await?;
2349
2350        Ok(())
2351    }
2352
2353    async fn on_settle_offer(
2354        &self,
2355        settle_offer: &SettleOffer,
2356        peer_id: &PublicKey,
2357    ) -> Result<Option<Reject>, Error> {
2358        let mut signed_channel =
2359            get_channel_in_state!(self, &settle_offer.channel_id, Signed, Some(*peer_id))?;
2360
2361        if let SignedChannelState::SettledOffered { .. } = signed_channel.state {
2362            return Ok(Some(Reject {
2363                channel_id: settle_offer.channel_id,
2364            }));
2365        }
2366
2367        crate::channel_updater::on_settle_offer(&mut signed_channel, settle_offer)?;
2368
2369        self.store
2370            .upsert_channel(Channel::Signed(signed_channel), None)
2371            .await?;
2372
2373        Ok(None)
2374    }
2375
2376    async fn on_settle_accept(
2377        &self,
2378        settle_accept: &SettleAccept,
2379        peer_id: &PublicKey,
2380    ) -> Result<SettleConfirm, Error> {
2381        let mut signed_channel =
2382            get_channel_in_state!(self, &settle_accept.channel_id, Signed, Some(*peer_id))?;
2383
2384        let msg = crate::channel_updater::settle_channel_confirm(
2385            &self.secp,
2386            &mut signed_channel,
2387            settle_accept,
2388            CET_NSEQUENCE,
2389            0,
2390            PEER_TIMEOUT,
2391            &self.signer_provider,
2392            &self.time,
2393            &self.chain_monitor,
2394        )
2395        .await?;
2396
2397        self.store
2398            .upsert_channel(Channel::Signed(signed_channel), None)
2399            .await?;
2400
2401        Ok(msg)
2402    }
2403
2404    async fn on_settle_confirm(
2405        &self,
2406        settle_confirm: &SettleConfirm,
2407        peer_id: &PublicKey,
2408    ) -> Result<SettleFinalize, Error> {
2409        let mut signed_channel =
2410            get_channel_in_state!(self, &settle_confirm.channel_id, Signed, Some(*peer_id))?;
2411        let &own_payout = get_signed_channel_state!(signed_channel, SettledAccepted, own_payout)?;
2412        let (prev_buffer_tx, own_buffer_adaptor_signature, is_offer, signed_contract_id) = get_signed_channel_rollback_state!(
2413            signed_channel,
2414            Established,
2415            buffer_transaction,
2416            own_buffer_adaptor_signature,
2417            is_offer,
2418            signed_contract_id
2419        )?;
2420
2421        let prev_buffer_txid = prev_buffer_tx.compute_txid();
2422        let own_buffer_adaptor_signature = *own_buffer_adaptor_signature;
2423        let is_offer = *is_offer;
2424        let signed_contract_id = *signed_contract_id;
2425
2426        let msg = crate::channel_updater::settle_channel_finalize(
2427            &self.secp,
2428            &mut signed_channel,
2429            settle_confirm,
2430            &self.signer_provider,
2431        )?;
2432
2433        self.chain_monitor.lock().await.add_tx(
2434            prev_buffer_txid,
2435            ChannelInfo {
2436                channel_id: signed_channel.channel_id,
2437                tx_type: TxType::Revoked {
2438                    update_idx: signed_channel.update_idx + 1,
2439                    own_adaptor_signature: own_buffer_adaptor_signature,
2440                    is_offer,
2441                    revoked_tx_type: RevokedTxType::Buffer,
2442                },
2443            },
2444        );
2445
2446        let closed_contract = Contract::Closed(
2447            self.get_collaboratively_closed_contract(&signed_contract_id, own_payout, true)
2448                .await?,
2449        );
2450
2451        self.store
2452            .upsert_channel(Channel::Signed(signed_channel), Some(closed_contract))
2453            .await?;
2454        self.store
2455            .persist_chain_monitor(&*self.chain_monitor.lock().await)
2456            .await?;
2457
2458        Ok(msg)
2459    }
2460
2461    async fn on_settle_finalize(
2462        &self,
2463        settle_finalize: &SettleFinalize,
2464        peer_id: &PublicKey,
2465    ) -> Result<(), Error> {
2466        let mut signed_channel =
2467            get_channel_in_state!(self, &settle_finalize.channel_id, Signed, Some(*peer_id))?;
2468        let &own_payout = get_signed_channel_state!(signed_channel, SettledConfirmed, own_payout)?;
2469        let (buffer_tx, own_buffer_adaptor_signature, is_offer, signed_contract_id) = get_signed_channel_rollback_state!(
2470            signed_channel,
2471            Established,
2472            buffer_transaction,
2473            own_buffer_adaptor_signature,
2474            is_offer,
2475            signed_contract_id
2476        )?;
2477
2478        let own_buffer_adaptor_signature = *own_buffer_adaptor_signature;
2479        let is_offer = *is_offer;
2480        let buffer_txid = buffer_tx.compute_txid();
2481        let signed_contract_id = *signed_contract_id;
2482
2483        crate::channel_updater::settle_channel_on_finalize(
2484            &self.secp,
2485            &mut signed_channel,
2486            settle_finalize,
2487        )?;
2488
2489        self.chain_monitor.lock().await.add_tx(
2490            buffer_txid,
2491            ChannelInfo {
2492                channel_id: signed_channel.channel_id,
2493                tx_type: TxType::Revoked {
2494                    update_idx: signed_channel.update_idx + 1,
2495                    own_adaptor_signature: own_buffer_adaptor_signature,
2496                    is_offer,
2497                    revoked_tx_type: RevokedTxType::Buffer,
2498                },
2499            },
2500        );
2501
2502        let closed_contract = Contract::Closed(
2503            self.get_collaboratively_closed_contract(&signed_contract_id, own_payout, true)
2504                .await?,
2505        );
2506        self.store
2507            .upsert_channel(Channel::Signed(signed_channel), Some(closed_contract))
2508            .await?;
2509        self.store
2510            .persist_chain_monitor(&*self.chain_monitor.lock().await)
2511            .await?;
2512
2513        Ok(())
2514    }
2515
2516    async fn on_renew_offer(
2517        &self,
2518        renew_offer: &RenewOffer,
2519        peer_id: &PublicKey,
2520    ) -> Result<Option<Reject>, Error> {
2521        let mut signed_channel =
2522            get_channel_in_state!(self, &renew_offer.channel_id, Signed, Some(*peer_id))?;
2523
2524        // Received a renew offer when we already sent one, we reject it.
2525        if let SignedChannelState::RenewOffered { is_offer, .. } = signed_channel.state {
2526            if is_offer {
2527                return Ok(Some(Reject {
2528                    channel_id: renew_offer.channel_id,
2529                }));
2530            }
2531        }
2532
2533        let offered_contract = crate::channel_updater::on_renew_offer(
2534            &mut signed_channel,
2535            renew_offer,
2536            PEER_TIMEOUT,
2537            &self.time,
2538        )?;
2539
2540        self.store.create_contract(&offered_contract).await?;
2541        self.store
2542            .upsert_channel(Channel::Signed(signed_channel), None)
2543            .await?;
2544
2545        Ok(None)
2546    }
2547
2548    async fn on_renew_accept(
2549        &self,
2550        renew_accept: &RenewAccept,
2551        peer_id: &PublicKey,
2552    ) -> Result<RenewConfirm, Error> {
2553        let mut signed_channel =
2554            get_channel_in_state!(self, &renew_accept.channel_id, Signed, Some(*peer_id))?;
2555        let offered_contract_id = signed_channel.get_contract_id().ok_or_else(|| {
2556            Error::InvalidState(
2557                "Expected to be in a state with an associated contract id but was not.".to_string(),
2558            )
2559        })?;
2560
2561        let offered_contract =
2562            get_contract_in_state!(self, &offered_contract_id, Offered, Some(*peer_id))?;
2563
2564        let (signed_contract, msg) = crate::channel_updater::verify_renew_accept_and_confirm(
2565            &self.secp,
2566            renew_accept,
2567            &mut signed_channel,
2568            &offered_contract,
2569            CET_NSEQUENCE,
2570            PEER_TIMEOUT,
2571            &self.wallet,
2572            &self.signer_provider,
2573            &self.time,
2574            &self.store,
2575            &self.logger,
2576        )
2577        .await?;
2578
2579        // Directly confirmed as we're in a channel the fund tx is already confirmed.
2580        self.store
2581            .upsert_channel(
2582                Channel::Signed(signed_channel),
2583                Some(Contract::Confirmed(signed_contract)),
2584            )
2585            .await?;
2586
2587        Ok(msg)
2588    }
2589
2590    async fn on_renew_confirm(
2591        &self,
2592        renew_confirm: &RenewConfirm,
2593        peer_id: &PublicKey,
2594    ) -> Result<RenewFinalize, Error> {
2595        let mut signed_channel =
2596            get_channel_in_state!(self, &renew_confirm.channel_id, Signed, Some(*peer_id))?;
2597        let own_payout = get_signed_channel_state!(signed_channel, RenewAccepted, own_payout)?;
2598        let contract_id = signed_channel.get_contract_id().ok_or_else(|| {
2599            Error::InvalidState(
2600                "Expected to be in a state with an associated contract id but was not.".to_string(),
2601            )
2602        })?;
2603
2604        let (tx_type, prev_tx_id, closed_contract) = match signed_channel
2605            .roll_back_state
2606            .as_ref()
2607            .expect("to have a rollback state")
2608        {
2609            SignedChannelState::Established {
2610                own_buffer_adaptor_signature,
2611                buffer_transaction,
2612                signed_contract_id,
2613                ..
2614            } => {
2615                let closed_contract = Contract::Closed(
2616                    self.get_collaboratively_closed_contract(signed_contract_id, *own_payout, true)
2617                        .await?,
2618                );
2619                (
2620                    TxType::Revoked {
2621                        update_idx: signed_channel.update_idx,
2622                        own_adaptor_signature: *own_buffer_adaptor_signature,
2623                        is_offer: false,
2624                        revoked_tx_type: RevokedTxType::Buffer,
2625                    },
2626                    buffer_transaction.compute_txid(),
2627                    Some(closed_contract),
2628                )
2629            }
2630            SignedChannelState::Settled {
2631                settle_tx,
2632                own_settle_adaptor_signature,
2633                ..
2634            } => (
2635                TxType::Revoked {
2636                    update_idx: signed_channel.update_idx,
2637                    own_adaptor_signature: *own_settle_adaptor_signature,
2638                    is_offer: false,
2639                    revoked_tx_type: RevokedTxType::Settle,
2640                },
2641                settle_tx.compute_txid(),
2642                None,
2643            ),
2644            s => {
2645                return Err(Error::InvalidState(format!(
2646                    "Expected rollback state Established or Revoked but found {s:?}"
2647                )))
2648            }
2649        };
2650        let accepted_contract =
2651            get_contract_in_state!(self, &contract_id, Accepted, Some(*peer_id))?;
2652
2653        let (signed_contract, msg) = crate::channel_updater::verify_renew_confirm_and_finalize(
2654            &self.secp,
2655            &mut signed_channel,
2656            &accepted_contract,
2657            renew_confirm,
2658            PEER_TIMEOUT,
2659            &self.time,
2660            &self.wallet,
2661            &self.signer_provider,
2662            &self.chain_monitor,
2663            &self.store,
2664            &self.logger,
2665        )
2666        .await?;
2667
2668        self.chain_monitor.lock().await.add_tx(
2669            prev_tx_id,
2670            ChannelInfo {
2671                channel_id: signed_channel.channel_id,
2672                tx_type,
2673            },
2674        );
2675
2676        // Directly confirmed as we're in a channel the fund tx is already confirmed.
2677        self.store
2678            .upsert_channel(
2679                Channel::Signed(signed_channel),
2680                Some(Contract::Confirmed(signed_contract)),
2681            )
2682            .await?;
2683
2684        self.store
2685            .persist_chain_monitor(&*self.chain_monitor.lock().await)
2686            .await?;
2687
2688        if let Some(closed_contract) = closed_contract {
2689            self.store.update_contract(&closed_contract).await?;
2690        }
2691
2692        Ok(msg)
2693    }
2694
2695    async fn on_renew_finalize(
2696        &self,
2697        renew_finalize: &RenewFinalize,
2698        peer_id: &PublicKey,
2699    ) -> Result<RenewRevoke, Error> {
2700        let mut signed_channel =
2701            get_channel_in_state!(self, &renew_finalize.channel_id, Signed, Some(*peer_id))?;
2702        let own_payout = get_signed_channel_state!(signed_channel, RenewConfirmed, own_payout)?;
2703
2704        let (tx_type, prev_tx_id, closed_contract) = match signed_channel
2705            .roll_back_state
2706            .as_ref()
2707            .expect("to have a rollback state")
2708        {
2709            SignedChannelState::Established {
2710                own_buffer_adaptor_signature,
2711                buffer_transaction,
2712                signed_contract_id,
2713                ..
2714            } => {
2715                let closed_contract = self
2716                    .get_collaboratively_closed_contract(signed_contract_id, *own_payout, true)
2717                    .await?;
2718                (
2719                    TxType::Revoked {
2720                        update_idx: signed_channel.update_idx,
2721                        own_adaptor_signature: *own_buffer_adaptor_signature,
2722                        is_offer: false,
2723                        revoked_tx_type: RevokedTxType::Buffer,
2724                    },
2725                    buffer_transaction.compute_txid(),
2726                    Some(Contract::Closed(closed_contract)),
2727                )
2728            }
2729            SignedChannelState::Settled {
2730                settle_tx,
2731                own_settle_adaptor_signature,
2732                ..
2733            } => (
2734                TxType::Revoked {
2735                    update_idx: signed_channel.update_idx,
2736                    own_adaptor_signature: *own_settle_adaptor_signature,
2737                    is_offer: false,
2738                    revoked_tx_type: RevokedTxType::Settle,
2739                },
2740                settle_tx.compute_txid(),
2741                None,
2742            ),
2743            s => {
2744                return Err(Error::InvalidState(format!(
2745                    "Expected rollback state of Established or Settled but was {s:?}"
2746                )))
2747            }
2748        };
2749
2750        let msg = crate::channel_updater::renew_channel_on_finalize(
2751            &self.secp,
2752            &mut signed_channel,
2753            renew_finalize,
2754            &self.signer_provider,
2755        )?;
2756
2757        self.chain_monitor.lock().await.add_tx(
2758            prev_tx_id,
2759            ChannelInfo {
2760                channel_id: signed_channel.channel_id,
2761                tx_type,
2762            },
2763        );
2764
2765        let buffer_tx =
2766            get_signed_channel_state!(signed_channel, Established, ref buffer_transaction)?;
2767
2768        self.chain_monitor.lock().await.add_tx(
2769            buffer_tx.compute_txid(),
2770            ChannelInfo {
2771                channel_id: signed_channel.channel_id,
2772                tx_type: TxType::BufferTx,
2773            },
2774        );
2775
2776        self.store
2777            .upsert_channel(Channel::Signed(signed_channel), None)
2778            .await?;
2779        self.store
2780            .persist_chain_monitor(&*self.chain_monitor.lock().await)
2781            .await?;
2782
2783        if let Some(closed_contract) = closed_contract {
2784            self.store.update_contract(&closed_contract).await?;
2785        }
2786
2787        Ok(msg)
2788    }
2789
2790    async fn on_renew_revoke(
2791        &self,
2792        renew_revoke: &RenewRevoke,
2793        peer_id: &PublicKey,
2794    ) -> Result<(), Error> {
2795        let mut signed_channel =
2796            get_channel_in_state!(self, &renew_revoke.channel_id, Signed, Some(*peer_id))?;
2797
2798        crate::channel_updater::renew_channel_on_revoke(
2799            &self.secp,
2800            &mut signed_channel,
2801            renew_revoke,
2802        )?;
2803
2804        self.store
2805            .upsert_channel(Channel::Signed(signed_channel), None)
2806            .await?;
2807
2808        Ok(())
2809    }
2810
2811    async fn on_collaborative_close_offer(
2812        &self,
2813        close_offer: &CollaborativeCloseOffer,
2814        peer_id: &PublicKey,
2815    ) -> Result<(), Error> {
2816        let mut signed_channel =
2817            get_channel_in_state!(self, &close_offer.channel_id, Signed, Some(*peer_id))?;
2818
2819        crate::channel_updater::on_collaborative_close_offer(
2820            &mut signed_channel,
2821            close_offer,
2822            PEER_TIMEOUT,
2823            &self.time,
2824        )?;
2825
2826        self.store
2827            .upsert_channel(Channel::Signed(signed_channel), None)
2828            .await?;
2829
2830        Ok(())
2831    }
2832
2833    async fn on_reject(&self, reject: &Reject, counter_party: &PublicKey) -> Result<(), Error> {
2834        let channel = self.store.get_channel(&reject.channel_id).await?;
2835
2836        if let Some(channel) = channel {
2837            if channel.get_counter_party_id() != *counter_party {
2838                return Err(Error::InvalidParameters(format!(
2839                    "Peer {:02x?} is not involved with {} {:02x?}.",
2840                    counter_party,
2841                    stringify!(Channel),
2842                    channel.get_id()
2843                )));
2844            }
2845            match channel {
2846                Channel::Offered(offered_channel) => {
2847                    let offered_contract = get_contract_in_state!(
2848                        self,
2849                        &offered_channel.offered_contract_id,
2850                        Offered,
2851                        None as Option<PublicKey>
2852                    )?;
2853                    let utxos = offered_contract
2854                        .funding_inputs
2855                        .iter()
2856                        .map(|funding_input| {
2857                            let txid = Transaction::consensus_decode(
2858                                &mut funding_input.prev_tx.as_slice(),
2859                            )
2860                            .expect("Transaction Decode Error")
2861                            .compute_txid();
2862                            let vout = funding_input.prev_tx_vout;
2863                            OutPoint { txid, vout }
2864                        })
2865                        .collect::<Vec<_>>();
2866
2867                    self.wallet.unreserve_utxos(&utxos)?;
2868
2869                    // remove rejected channel, since nothing has been confirmed on chain yet.
2870                    self.store
2871                        .upsert_channel(
2872                            Channel::Cancelled(offered_channel),
2873                            Some(Contract::Rejected(offered_contract)),
2874                        )
2875                        .await?;
2876                }
2877                Channel::Signed(mut signed_channel) => {
2878                    let contract = match signed_channel.state {
2879                        SignedChannelState::RenewOffered {
2880                            offered_contract_id,
2881                            ..
2882                        } => {
2883                            let offered_contract = get_contract_in_state!(
2884                                self,
2885                                &offered_contract_id,
2886                                Offered,
2887                                None::<PublicKey>
2888                            )?;
2889                            Some(Contract::Rejected(offered_contract))
2890                        }
2891                        _ => None,
2892                    };
2893
2894                    crate::channel_updater::on_reject(&mut signed_channel)?;
2895
2896                    self.store
2897                        .upsert_channel(Channel::Signed(signed_channel), contract)
2898                        .await?;
2899                }
2900                channel => {
2901                    return Err(Error::InvalidState(format!(
2902                        "Not in a state adequate to receive a reject message. {channel:?}"
2903                    )))
2904                }
2905            }
2906        } else {
2907            log_warn!(
2908                self.logger,
2909                "Couldn't find rejected dlc channel with id: {}",
2910                reject.channel_id.to_lower_hex_string()
2911            );
2912        }
2913
2914        Ok(())
2915    }
2916
2917    async fn channel_checks(&self) -> Result<(), Error> {
2918        let established_closing_channels = self
2919            .store
2920            .get_signed_channels(Some(SignedChannelStateType::Closing))
2921            .await?;
2922
2923        for channel in established_closing_channels {
2924            if let Err(e) = self.try_finalize_closing_established_channel(channel).await {
2925                log_error!(
2926                    self.logger,
2927                    "Error trying to close established channel: {}",
2928                    e
2929                );
2930            }
2931        }
2932
2933        if let Err(e) = self.check_for_timed_out_channels().await {
2934            log_error!(self.logger, "Error checking timed out channels {}", e);
2935        }
2936        self.check_for_watched_tx().await
2937    }
2938
2939    async fn check_for_timed_out_channels(&self) -> Result<(), Error> {
2940        check_for_timed_out_channels!(self, RenewOffered);
2941        check_for_timed_out_channels!(self, RenewAccepted);
2942        check_for_timed_out_channels!(self, RenewConfirmed);
2943        check_for_timed_out_channels!(self, SettledOffered);
2944        check_for_timed_out_channels!(self, SettledAccepted);
2945        check_for_timed_out_channels!(self, SettledConfirmed);
2946
2947        Ok(())
2948    }
2949
2950    pub(crate) async fn process_watched_txs(
2951        &self,
2952        watched_txs: Vec<(Transaction, ChannelInfo)>,
2953    ) -> Result<(), Error> {
2954        for (tx, channel_info) in watched_txs {
2955            let mut signed_channel = match get_channel_in_state!(
2956                self,
2957                &channel_info.channel_id,
2958                Signed,
2959                None as Option<PublicKey>
2960            ) {
2961                Ok(c) => c,
2962                Err(e) => {
2963                    log_error!(
2964                        self.logger,
2965                        "Could not retrieve channel {:?}: {}",
2966                        channel_info.channel_id,
2967                        e
2968                    );
2969                    continue;
2970                }
2971            };
2972
2973            let persist = match channel_info.tx_type {
2974                TxType::BufferTx => {
2975                    // TODO(tibo): should only considered closed after some confirmations.
2976                    // Ideally should save previous state, and maybe restore in
2977                    // case of reorg, though if the counter party has sent the
2978                    // tx to close the channel it is unlikely that the tx will
2979                    // not be part of a future block.
2980
2981                    let contract_id = signed_channel
2982                        .get_contract_id()
2983                        .expect("to have a contract id");
2984                    let mut state = SignedChannelState::Closing {
2985                        buffer_transaction: tx.clone(),
2986                        is_initiator: false,
2987                        contract_id,
2988                        keys_id: signed_channel.keys_id().ok_or_else(|| {
2989                            Error::InvalidState("Expected to have keys_id.".to_string())
2990                        })?,
2991                    };
2992                    std::mem::swap(&mut signed_channel.state, &mut state);
2993
2994                    signed_channel.roll_back_state = Some(state);
2995
2996                    self.store
2997                        .upsert_channel(Channel::Signed(signed_channel), None)
2998                        .await?;
2999
3000                    false
3001                }
3002                TxType::Revoked {
3003                    update_idx,
3004                    own_adaptor_signature,
3005                    is_offer,
3006                    revoked_tx_type,
3007                } => {
3008                    let secret = signed_channel
3009                        .counter_party_commitment_secrets
3010                        .get_secret(update_idx)
3011                        .expect("to be able to retrieve the per update secret");
3012                    let counter_per_update_secret = SecretKey::from_slice(&secret)
3013                        .expect("to be able to parse the counter per update secret.");
3014
3015                    let per_update_seed_pk = signed_channel.own_per_update_seed;
3016
3017                    let per_update_seed_sk = self
3018                        .signer_provider
3019                        .get_secret_key_for_pubkey(&per_update_seed_pk)?;
3020
3021                    let per_update_secret = SecretKey::from_slice(&build_commitment_secret(
3022                        per_update_seed_sk.as_ref(),
3023                        update_idx,
3024                    ))
3025                    .expect("a valid secret key.");
3026
3027                    let per_update_point =
3028                        PublicKey::from_secret_key(&self.secp, &per_update_secret);
3029
3030                    let own_revocation_params = signed_channel.own_points.get_revokable_params(
3031                        &self.secp,
3032                        &signed_channel.counter_points.revocation_basepoint,
3033                        &per_update_point,
3034                    );
3035
3036                    let counter_per_update_point =
3037                        PublicKey::from_secret_key(&self.secp, &counter_per_update_secret);
3038
3039                    let base_own_sk = self
3040                        .signer_provider
3041                        .get_secret_key_for_pubkey(&signed_channel.own_points.own_basepoint)?;
3042
3043                    let own_sk = derive_private_key(&self.secp, &per_update_point, &base_own_sk);
3044
3045                    let counter_revocation_params =
3046                        signed_channel.counter_points.get_revokable_params(
3047                            &self.secp,
3048                            &signed_channel.own_points.revocation_basepoint,
3049                            &counter_per_update_point,
3050                        );
3051
3052                    let witness = if signed_channel.own_params.fund_pubkey
3053                        < signed_channel.counter_params.fund_pubkey
3054                    {
3055                        tx.input[0].witness.to_vec().remove(1)
3056                    } else {
3057                        tx.input[0].witness.to_vec().remove(2)
3058                    };
3059
3060                    let sig_data = witness
3061                        .iter()
3062                        .take(witness.len() - 1)
3063                        .cloned()
3064                        .collect::<Vec<_>>();
3065                    let own_sig = Signature::from_der(&sig_data)?;
3066
3067                    let counter_sk = own_adaptor_signature.recover(
3068                        &self.secp,
3069                        &own_sig,
3070                        &counter_revocation_params.publish_pk.inner,
3071                    )?;
3072
3073                    let own_revocation_base_secret =
3074                        &self.signer_provider.get_secret_key_for_pubkey(
3075                            &signed_channel.own_points.revocation_basepoint,
3076                        )?;
3077
3078                    let counter_revocation_sk = derive_private_revocation_key(
3079                        &self.secp,
3080                        &counter_per_update_secret,
3081                        own_revocation_base_secret,
3082                    );
3083
3084                    let (offer_params, accept_params) = if is_offer {
3085                        (&own_revocation_params, &counter_revocation_params)
3086                    } else {
3087                        (&counter_revocation_params, &own_revocation_params)
3088                    };
3089
3090                    let fee_rate_per_vb: u64 = (self.fee_estimator.get_est_sat_per_1000_weight(
3091                        lightning::chain::chaininterface::ConfirmationTarget::UrgentOnChainSweep,
3092                    ) / 250)
3093                        .into();
3094
3095                    let signed_tx = match revoked_tx_type {
3096                        RevokedTxType::Buffer => {
3097                            ddk_dlc::channel::create_and_sign_punish_buffer_transaction(
3098                                &self.secp,
3099                                offer_params,
3100                                accept_params,
3101                                &own_sk,
3102                                &counter_sk,
3103                                &counter_revocation_sk,
3104                                &tx,
3105                                &self.wallet.get_new_address().await?,
3106                                0,
3107                                fee_rate_per_vb,
3108                            )?
3109                        }
3110                        RevokedTxType::Settle => {
3111                            ddk_dlc::channel::create_and_sign_punish_settle_transaction(
3112                                &self.secp,
3113                                offer_params,
3114                                accept_params,
3115                                &own_sk,
3116                                &counter_sk,
3117                                &counter_revocation_sk,
3118                                &tx,
3119                                &self.wallet.get_new_address().await?,
3120                                CET_NSEQUENCE,
3121                                0,
3122                                fee_rate_per_vb,
3123                                is_offer,
3124                            )?
3125                        }
3126                    };
3127
3128                    self.blockchain.send_transaction(&signed_tx).await?;
3129
3130                    let closed_channel = Channel::ClosedPunished(ClosedPunishedChannel {
3131                        counter_party: signed_channel.counter_party,
3132                        temporary_channel_id: signed_channel.temporary_channel_id,
3133                        channel_id: signed_channel.channel_id,
3134                        punish_txid: signed_tx.compute_txid(),
3135                    });
3136
3137                    //TODO(tibo): should probably make sure the tx is confirmed somewhere before
3138                    //stop watching the cheating tx.
3139                    self.chain_monitor
3140                        .lock()
3141                        .await
3142                        .cleanup_channel(signed_channel.channel_id);
3143                    self.store.upsert_channel(closed_channel, None).await?;
3144                    true
3145                }
3146                TxType::CollaborativeClose => {
3147                    if let Some(SignedChannelState::Established {
3148                        signed_contract_id, ..
3149                    }) = signed_channel.roll_back_state
3150                    {
3151                        let counter_payout = get_signed_channel_state!(
3152                            signed_channel,
3153                            CollaborativeCloseOffered,
3154                            counter_payout
3155                        )?;
3156                        let closed_contract = self
3157                            .get_collaboratively_closed_contract(
3158                                &signed_contract_id,
3159                                *counter_payout,
3160                                false,
3161                            )
3162                            .await?;
3163                        self.store
3164                            .update_contract(&Contract::Closed(closed_contract))
3165                            .await?;
3166                    }
3167                    let closed_channel = Channel::CollaborativelyClosed(ClosedChannel {
3168                        counter_party: signed_channel.counter_party,
3169                        temporary_channel_id: signed_channel.temporary_channel_id,
3170                        channel_id: signed_channel.channel_id,
3171                    });
3172                    self.chain_monitor
3173                        .lock()
3174                        .await
3175                        .cleanup_channel(signed_channel.channel_id);
3176                    self.store.upsert_channel(closed_channel, None).await?;
3177                    true
3178                }
3179                TxType::SettleTx => {
3180                    let closed_channel = Channel::CounterClosed(ClosedChannel {
3181                        counter_party: signed_channel.counter_party,
3182                        temporary_channel_id: signed_channel.temporary_channel_id,
3183                        channel_id: signed_channel.channel_id,
3184                    });
3185                    self.chain_monitor
3186                        .lock()
3187                        .await
3188                        .cleanup_channel(signed_channel.channel_id);
3189                    self.store.upsert_channel(closed_channel, None).await?;
3190                    true
3191                }
3192                TxType::Cet => {
3193                    let contract_id = signed_channel.get_contract_id();
3194                    let closed_channel = {
3195                        match &signed_channel.state {
3196                            SignedChannelState::Closing { is_initiator, .. } => {
3197                                if *is_initiator {
3198                                    Channel::Closed(ClosedChannel {
3199                                        counter_party: signed_channel.counter_party,
3200                                        temporary_channel_id: signed_channel.temporary_channel_id,
3201                                        channel_id: signed_channel.channel_id,
3202                                    })
3203                                } else {
3204                                    Channel::CounterClosed(ClosedChannel {
3205                                        counter_party: signed_channel.counter_party,
3206                                        temporary_channel_id: signed_channel.temporary_channel_id,
3207                                        channel_id: signed_channel.channel_id,
3208                                    })
3209                                }
3210                            }
3211                            _ => {
3212                                log_error!(self.logger, "Saw spending of buffer transaction without being in closing state");
3213                                Channel::Closed(ClosedChannel {
3214                                    counter_party: signed_channel.counter_party,
3215                                    temporary_channel_id: signed_channel.temporary_channel_id,
3216                                    channel_id: signed_channel.channel_id,
3217                                })
3218                            }
3219                        }
3220                    };
3221
3222                    self.chain_monitor
3223                        .lock()
3224                        .await
3225                        .cleanup_channel(signed_channel.channel_id);
3226                    let pre_closed_contract = futures::stream::iter(contract_id)
3227                        .then(|contract_id| {
3228                            let txn = tx.clone();
3229                            async move {
3230                                self.store.get_contract(&contract_id).await.map(|contract| {
3231                                    contract.and_then(|contract| match contract {
3232                                        Contract::Confirmed(signed_contract) => {
3233                                            Some(Contract::PreClosed(PreClosedContract {
3234                                                signed_contract,
3235                                                attestations: None,
3236                                                signed_cet: txn,
3237                                            }))
3238                                        }
3239                                        _ => None,
3240                                    })
3241                                })
3242                            }
3243                        })
3244                        .try_collect::<Vec<_>>()
3245                        .await?
3246                        .into_iter()
3247                        .flatten()
3248                        .next();
3249
3250                    self.store
3251                        .upsert_channel(closed_channel, pre_closed_contract)
3252                        .await?;
3253
3254                    true
3255                }
3256            };
3257
3258            if persist {
3259                self.store
3260                    .persist_chain_monitor(&*self.chain_monitor.lock().await)
3261                    .await?;
3262            }
3263        }
3264        Ok(())
3265    }
3266
3267    async fn check_for_watched_tx(&self) -> Result<(), Error> {
3268        let confirmed_txs = self.chain_monitor.lock().await.confirmed_txs();
3269
3270        self.process_watched_txs(confirmed_txs).await?;
3271
3272        self.store
3273            .persist_chain_monitor(&*self.chain_monitor.lock().await)
3274            .await?;
3275
3276        Ok(())
3277    }
3278
3279    async fn force_close_channel_internal(
3280        &self,
3281        mut channel: SignedChannel,
3282        is_initiator: bool,
3283    ) -> Result<(), Error> {
3284        match &channel.state {
3285            SignedChannelState::Established {
3286                counter_buffer_adaptor_signature,
3287                buffer_transaction,
3288                ..
3289            } => {
3290                let counter_buffer_adaptor_signature = *counter_buffer_adaptor_signature;
3291                let buffer_transaction = buffer_transaction.clone();
3292                self.initiate_unilateral_close_established_channel(
3293                    channel,
3294                    is_initiator,
3295                    counter_buffer_adaptor_signature,
3296                    buffer_transaction,
3297                )
3298                .await
3299            }
3300            SignedChannelState::RenewFinalized {
3301                buffer_transaction,
3302                offer_buffer_adaptor_signature,
3303                ..
3304            } => {
3305                let offer_buffer_adaptor_signature = *offer_buffer_adaptor_signature;
3306                let buffer_transaction = buffer_transaction.clone();
3307                self.initiate_unilateral_close_established_channel(
3308                    channel,
3309                    is_initiator,
3310                    offer_buffer_adaptor_signature,
3311                    buffer_transaction,
3312                )
3313                .await
3314            }
3315            SignedChannelState::Settled { .. } => {
3316                self.close_settled_channel(channel, is_initiator).await
3317            }
3318            SignedChannelState::SettledOffered { .. }
3319            | SignedChannelState::SettledReceived { .. }
3320            | SignedChannelState::SettledAccepted { .. }
3321            | SignedChannelState::SettledConfirmed { .. }
3322            | SignedChannelState::RenewOffered { .. }
3323            | SignedChannelState::RenewAccepted { .. }
3324            | SignedChannelState::RenewConfirmed { .. }
3325            | SignedChannelState::CollaborativeCloseOffered { .. } => {
3326                channel.state = channel
3327                    .roll_back_state
3328                    .take()
3329                    .expect("to have a rollback state");
3330                let channel_clone = channel.clone(); // Clone the channel to avoid moving it
3331                Box::pin(self.force_close_channel_internal(channel_clone, is_initiator)).await
3332            }
3333            SignedChannelState::Closing { .. } => Err(Error::InvalidState(
3334                "Channel is already closing.".to_string(),
3335            )),
3336        }
3337    }
3338
3339    /// Initiate the unilateral closing of a channel that has been established.
3340    async fn initiate_unilateral_close_established_channel(
3341        &self,
3342        mut signed_channel: SignedChannel,
3343        is_initiator: bool,
3344        buffer_adaptor_signature: EcdsaAdaptorSignature,
3345        buffer_transaction: Transaction,
3346    ) -> Result<(), Error> {
3347        let keys_id = signed_channel.keys_id().ok_or_else(|| {
3348            Error::InvalidState("Expected to be in a state with an associated keys id.".to_string())
3349        })?;
3350
3351        crate::channel_updater::initiate_unilateral_close_established_channel(
3352            &self.secp,
3353            &mut signed_channel,
3354            buffer_adaptor_signature,
3355            keys_id,
3356            buffer_transaction,
3357            &self.signer_provider,
3358            is_initiator,
3359        )?;
3360
3361        let buffer_transaction =
3362            get_signed_channel_state!(signed_channel, Closing, ref buffer_transaction)?;
3363
3364        self.blockchain.send_transaction(buffer_transaction).await?;
3365
3366        self.chain_monitor
3367            .lock()
3368            .await
3369            .remove_tx(&buffer_transaction.compute_txid());
3370
3371        self.store
3372            .upsert_channel(Channel::Signed(signed_channel), None)
3373            .await?;
3374
3375        self.store
3376            .persist_chain_monitor(&*self.chain_monitor.lock().await)
3377            .await?;
3378
3379        Ok(())
3380    }
3381
3382    /// Unilaterally close a channel that has been settled.
3383    async fn close_settled_channel(
3384        &self,
3385        signed_channel: SignedChannel,
3386        is_initiator: bool,
3387    ) -> Result<(), Error> {
3388        let (settle_tx, closed_channel) = crate::channel_updater::close_settled_channel(
3389            &self.secp,
3390            &signed_channel,
3391            &self.signer_provider,
3392            is_initiator,
3393        )?;
3394
3395        if self
3396            .blockchain
3397            .get_transaction_confirmations(&settle_tx.compute_txid())
3398            .await
3399            .unwrap_or(0)
3400            == 0
3401        {
3402            self.blockchain.send_transaction(&settle_tx).await?;
3403        }
3404
3405        self.chain_monitor
3406            .lock()
3407            .await
3408            .cleanup_channel(signed_channel.channel_id);
3409
3410        self.store.upsert_channel(closed_channel, None).await?;
3411
3412        Ok(())
3413    }
3414
3415    async fn get_collaboratively_closed_contract(
3416        &self,
3417        contract_id: &ContractId,
3418        payout: Amount,
3419        is_own_payout: bool,
3420    ) -> Result<ClosedContract, Error> {
3421        let contract = get_contract_in_state!(self, contract_id, Confirmed, None::<PublicKey>)?;
3422        let own_collateral = if contract.accepted_contract.offered_contract.is_offer_party {
3423            contract
3424                .accepted_contract
3425                .offered_contract
3426                .offer_params
3427                .collateral
3428        } else {
3429            contract.accepted_contract.accept_params.collateral
3430        };
3431        let own_payout = if is_own_payout {
3432            payout
3433        } else {
3434            contract.accepted_contract.offered_contract.total_collateral - payout
3435        };
3436        let pnl =
3437            SignedAmount::from_sat(own_payout.to_sat() as i64 - own_collateral.to_sat() as i64);
3438        Ok(ClosedContract {
3439            attestations: None,
3440            signed_cet: None,
3441            contract_id: *contract_id,
3442            temporary_contract_id: contract.accepted_contract.offered_contract.id,
3443            counter_party_id: contract.accepted_contract.offered_contract.counter_party,
3444            funding_txid: contract
3445                .accepted_contract
3446                .dlc_transactions
3447                .fund
3448                .compute_txid(),
3449            pnl,
3450            signed_contract: contract.clone(),
3451        })
3452    }
3453
3454    async fn oracle_announcements(
3455        &self,
3456        contract_input: &ContractInput,
3457    ) -> Result<Vec<Vec<OracleAnnouncement>>, Error> {
3458        let announcements = stream::iter(contract_input.contract_infos.iter())
3459            .map(|x| {
3460                let future = self.get_oracle_announcements(&x.oracles);
3461                async move {
3462                    match future.await {
3463                        Ok(result) => Ok(result),
3464                        Err(e) => {
3465                            log_error!(self.logger, "Failed to get oracle announcements: {}", e);
3466                            Err(e)
3467                        }
3468                    }
3469                }
3470            })
3471            .collect::<FuturesUnordered<_>>()
3472            .await
3473            .try_collect::<Vec<_>>()
3474            .await?;
3475        Ok(announcements)
3476    }
3477}