ddk_manager/
manager.rs

1//! #Manager a component to create and update DLCs.
2
3use super::{
4    Blockchain, CachedContractSignerProvider, ContractSigner, Oracle, Storage, Time, Wallet,
5};
6use crate::chain_monitor::{ChainMonitor, ChannelInfo, RevokedTxType, TxType};
7use crate::channel::offered_channel::OfferedChannel;
8use crate::channel::signed_channel::{SignedChannel, SignedChannelState, SignedChannelStateType};
9use crate::channel::{Channel, ClosedChannel, ClosedPunishedChannel};
10use crate::channel_updater::get_signed_channel_state;
11use crate::channel_updater::verify_signed_channel;
12use crate::contract::{
13    accepted_contract::AcceptedContract, contract_info::ContractInfo,
14    contract_input::ContractInput, contract_input::OracleInput, offered_contract::OfferedContract,
15    signed_contract::SignedContract, AdaptorInfo, ClosedContract, Contract, FailedAcceptContract,
16    FailedSignContract, PreClosedContract,
17};
18use crate::contract_updater::{accept_contract, verify_accepted_and_sign_contract};
19use crate::error::Error;
20use crate::utils::get_object_in_state;
21use crate::{ChannelId, ContractId, ContractSignerProvider};
22use bitcoin::absolute::Height;
23use bitcoin::consensus::encode::serialize_hex;
24use bitcoin::consensus::Decodable;
25use bitcoin::Address;
26use bitcoin::{OutPoint, Transaction};
27use dlc_messages::channel::{
28    AcceptChannel, CollaborativeCloseOffer, OfferChannel, Reject, RenewAccept, RenewConfirm,
29    RenewFinalize, RenewOffer, RenewRevoke, SettleAccept, SettleConfirm, SettleFinalize,
30    SettleOffer, SignChannel,
31};
32use dlc_messages::oracle_msgs::{OracleAnnouncement, OracleAttestation};
33use dlc_messages::{AcceptDlc, Message as DlcMessage, OfferDlc, SignDlc};
34use futures::stream;
35use futures::stream::FuturesUnordered;
36use futures::{StreamExt, TryStreamExt};
37use hex::DisplayHex;
38use lightning::chain::chaininterface::FeeEstimator;
39use lightning::ln::chan_utils::{
40    build_commitment_secret, derive_private_key, derive_private_revocation_key,
41};
42use secp256k1_zkp::XOnlyPublicKey;
43use secp256k1_zkp::{
44    ecdsa::Signature, All, EcdsaAdaptorSignature, PublicKey, Secp256k1, SecretKey,
45};
46use std::collections::HashMap;
47use std::ops::Deref;
48use std::string::ToString;
49use std::sync::Arc;
50use tokio::sync::Mutex;
51
52/// The number of confirmations required before moving the the confirmed state.
53pub const NB_CONFIRMATIONS: u32 = 6;
54/// The delay to set the refund value to.
55pub const REFUND_DELAY: u32 = 86400 * 7;
56/// The nSequence value used for CETs in DLC channels
57pub const CET_NSEQUENCE: u32 = 288;
58/// Timeout in seconds when waiting for a peer's reply, after which a DLC channel
59/// is forced closed.
60pub const PEER_TIMEOUT: u64 = 3600;
61
62type ClosableContractInfo<'a> = Option<(
63    &'a ContractInfo,
64    &'a AdaptorInfo,
65    Vec<(usize, OracleAttestation)>,
66)>;
67
68/// Used to create and update DLCs.
69pub struct Manager<
70    W: Deref,
71    SP: Deref,
72    B: Deref,
73    S: Deref,
74    O: Deref,
75    T: Deref,
76    F: Deref,
77    X: ContractSigner,
78> where
79    W::Target: Wallet,
80    SP::Target: ContractSignerProvider<Signer = X>,
81    B::Target: Blockchain,
82    S::Target: Storage,
83    O::Target: Oracle,
84    T::Target: Time,
85    F::Target: FeeEstimator,
86{
87    oracles: HashMap<XOnlyPublicKey, O>,
88    wallet: W,
89    signer_provider: SP,
90    blockchain: B,
91    store: S,
92    secp: Secp256k1<All>,
93    chain_monitor: Mutex<ChainMonitor>,
94    time: T,
95    fee_estimator: F,
96}
97
98macro_rules! get_contract_in_state {
99    ($manager: ident, $contract_id: expr, $state: ident, $peer_id: expr) => {{
100        get_object_in_state!(
101            $manager,
102            $contract_id,
103            $state,
104            $peer_id,
105            Contract,
106            get_contract
107        )
108    }};
109}
110
111macro_rules! get_channel_in_state {
112    ($manager: ident, $channel_id: expr, $state: ident, $peer_id: expr) => {{
113        get_object_in_state!(
114            $manager,
115            $channel_id,
116            $state,
117            $peer_id,
118            Channel,
119            get_channel
120        )
121    }};
122}
123
124macro_rules! get_signed_channel_rollback_state {
125    ($signed_channel: ident, $state: ident, $($field: ident),*) => {{
126       match $signed_channel.roll_back_state.as_ref() {
127           Some(SignedChannelState::$state{$($field,)* ..}) => Ok(($($field,)*)),
128           _ => Err(Error::InvalidState(format!("Expected rollback state {} got {:?}", stringify!($state), $signed_channel.state))),
129        }
130    }};
131}
132
133macro_rules! check_for_timed_out_channels {
134    ($manager: ident, $state: ident) => {
135        let channels = $manager
136            .store
137            .get_signed_channels(Some(SignedChannelStateType::$state))
138            .await?;
139
140        for channel in channels {
141            if let SignedChannelState::$state { timeout, .. } = channel.state {
142                let is_timed_out = timeout < $manager.time.unix_time_now();
143                if is_timed_out {
144                    match $manager.force_close_channel_internal(channel, true).await {
145                        Err(e) => tracing::error!("Error force closing channel {}", e),
146                        _ => {}
147                    }
148                }
149            }
150        }
151    };
152}
153
154impl<W: Deref, SP: Deref, B: Deref, S: Deref, O: Deref, T: Deref, F: Deref, X: ContractSigner>
155    Manager<W, Arc<CachedContractSignerProvider<SP, X>>, B, S, O, T, F, X>
156where
157    W::Target: Wallet,
158    SP::Target: ContractSignerProvider<Signer = X>,
159    B::Target: Blockchain,
160    S::Target: Storage,
161    O::Target: Oracle,
162    T::Target: Time,
163    F::Target: FeeEstimator,
164{
165    /// Create a new Manager struct.
166    pub async fn new(
167        wallet: W,
168        signer_provider: SP,
169        blockchain: B,
170        store: S,
171        oracles: HashMap<XOnlyPublicKey, O>,
172        time: T,
173        fee_estimator: F,
174    ) -> Result<Self, Error> {
175        let init_height = blockchain.get_blockchain_height().await?;
176        let chain_monitor = Mutex::new(
177            store
178                .get_chain_monitor()
179                .await?
180                .unwrap_or(ChainMonitor::new(init_height)),
181        );
182
183        let signer_provider = Arc::new(CachedContractSignerProvider::new(signer_provider));
184
185        Ok(Manager {
186            secp: secp256k1_zkp::Secp256k1::new(),
187            wallet,
188            signer_provider,
189            blockchain,
190            store,
191            oracles,
192            time,
193            fee_estimator,
194            chain_monitor,
195        })
196    }
197
198    /// Get the store from the Manager to access contracts.
199    pub fn get_store(&self) -> &S {
200        &self.store
201    }
202
203    /// Function called to pass a DlcMessage to the Manager.
204    pub async fn on_dlc_message(
205        &self,
206        msg: &DlcMessage,
207        counter_party: PublicKey,
208    ) -> Result<Option<DlcMessage>, Error> {
209        match msg {
210            DlcMessage::Offer(o) => {
211                self.on_offer_message(o, counter_party).await?;
212                Ok(None)
213            }
214            DlcMessage::Accept(a) => Ok(Some(self.on_accept_message(a, &counter_party).await?)),
215            DlcMessage::Sign(s) => {
216                self.on_sign_message(s, &counter_party).await?;
217                Ok(None)
218            }
219            DlcMessage::OfferChannel(o) => {
220                self.on_offer_channel(o, counter_party).await?;
221                Ok(None)
222            }
223            DlcMessage::AcceptChannel(a) => Ok(Some(DlcMessage::SignChannel(
224                self.on_accept_channel(a, &counter_party).await?,
225            ))),
226            DlcMessage::SignChannel(s) => {
227                self.on_sign_channel(s, &counter_party).await?;
228                Ok(None)
229            }
230            DlcMessage::SettleOffer(s) => match self.on_settle_offer(s, &counter_party).await? {
231                Some(msg) => Ok(Some(DlcMessage::Reject(msg))),
232                None => Ok(None),
233            },
234            DlcMessage::SettleAccept(s) => Ok(Some(DlcMessage::SettleConfirm(
235                self.on_settle_accept(s, &counter_party).await?,
236            ))),
237            DlcMessage::SettleConfirm(s) => Ok(Some(DlcMessage::SettleFinalize(
238                self.on_settle_confirm(s, &counter_party).await?,
239            ))),
240            DlcMessage::SettleFinalize(s) => {
241                self.on_settle_finalize(s, &counter_party).await?;
242                Ok(None)
243            }
244            DlcMessage::RenewOffer(r) => match self.on_renew_offer(r, &counter_party).await? {
245                Some(msg) => Ok(Some(DlcMessage::Reject(msg))),
246                None => Ok(None),
247            },
248            DlcMessage::RenewAccept(r) => Ok(Some(DlcMessage::RenewConfirm(
249                self.on_renew_accept(r, &counter_party).await?,
250            ))),
251            DlcMessage::RenewConfirm(r) => Ok(Some(DlcMessage::RenewFinalize(
252                self.on_renew_confirm(r, &counter_party).await?,
253            ))),
254            DlcMessage::RenewFinalize(r) => {
255                let revoke = self.on_renew_finalize(r, &counter_party).await?;
256                Ok(Some(DlcMessage::RenewRevoke(revoke)))
257            }
258            DlcMessage::RenewRevoke(r) => {
259                self.on_renew_revoke(r, &counter_party).await?;
260                Ok(None)
261            }
262            DlcMessage::CollaborativeCloseOffer(c) => {
263                self.on_collaborative_close_offer(c, &counter_party).await?;
264                Ok(None)
265            }
266            DlcMessage::Reject(r) => {
267                self.on_reject(r, &counter_party).await?;
268                Ok(None)
269            }
270        }
271    }
272
273    /// Function called to create a new DLC. The offered contract will be stored
274    /// and an OfferDlc message returned.
275    ///
276    /// This function will fetch the oracle announcements from the oracle.
277    pub async fn send_offer(
278        &self,
279        contract_input: &ContractInput,
280        counter_party: PublicKey,
281    ) -> Result<OfferDlc, Error> {
282        // If the oracle announcement fails to retrieve, then log and continue.
283        let oracle_announcements = self.oracle_announcements(contract_input).await?;
284
285        self.send_offer_with_announcements(contract_input, counter_party, oracle_announcements)
286            .await
287    }
288
289    /// Function called to create a new DLC. The offered contract will be stored
290    /// and an OfferDlc message returned.
291    ///
292    /// This function allows to pass the oracle announcements directly instead of
293    /// fetching them from the oracle.
294    pub async fn send_offer_with_announcements(
295        &self,
296        contract_input: &ContractInput,
297        counter_party: PublicKey,
298        oracle_announcements: Vec<Vec<OracleAnnouncement>>,
299    ) -> Result<OfferDlc, Error> {
300        let (offered_contract, offer_msg) = crate::contract_updater::offer_contract(
301            &self.secp,
302            contract_input,
303            oracle_announcements,
304            REFUND_DELAY,
305            &counter_party,
306            &self.wallet,
307            &self.blockchain,
308            &self.time,
309            &self.signer_provider,
310        )
311        .await?;
312
313        offered_contract.validate()?;
314
315        self.store.create_contract(&offered_contract).await?;
316
317        Ok(offer_msg)
318    }
319
320    /// Function to call to accept a DLC for which an offer was received.
321    pub async fn accept_contract_offer(
322        &self,
323        contract_id: &ContractId,
324    ) -> Result<(ContractId, PublicKey, AcceptDlc), Error> {
325        let offered_contract =
326            get_contract_in_state!(self, contract_id, Offered, None as Option<PublicKey>)?;
327
328        let counter_party = offered_contract.counter_party;
329
330        let (accepted_contract, accept_msg) = accept_contract(
331            &self.secp,
332            &offered_contract,
333            &self.wallet,
334            &self.signer_provider,
335            &self.blockchain,
336        )
337        .await?;
338
339        self.wallet.import_address(&Address::p2wsh(
340            &accepted_contract.dlc_transactions.funding_script_pubkey,
341            self.blockchain.get_network()?,
342        ))?;
343
344        let contract_id = accepted_contract.get_contract_id();
345
346        self.store
347            .update_contract(&Contract::Accepted(accepted_contract))
348            .await?;
349
350        Ok((contract_id, counter_party, accept_msg))
351    }
352
353    /// Function to update the state of the [`ChainMonitor`] with new
354    /// blocks.
355    ///
356    /// Consumers **MUST** call this periodically in order to
357    /// determine when pending transactions reach confirmation.
358    pub async fn periodic_chain_monitor(&self) -> Result<(), Error> {
359        let cur_height = self.blockchain.get_blockchain_height().await?;
360        let last_height = self.chain_monitor.lock().await.last_height;
361
362        // TODO(luckysori): We could end up reprocessing a block at
363        // the same height if there is a reorg.
364        if cur_height < last_height {
365            return Err(Error::InvalidState(
366                "Current height is lower than last height.".to_string(),
367            ));
368        }
369
370        for height in last_height + 1..=cur_height {
371            let block = self.blockchain.get_block_at_height(height).await?;
372
373            self.chain_monitor
374                .lock()
375                .await
376                .process_block(&block, height);
377        }
378
379        Ok(())
380    }
381
382    /// Function to call to check the state of the currently executing DLCs and
383    /// update them if possible.
384    pub async fn periodic_check(&self, check_channels: bool) -> Result<(), Error> {
385        tracing::debug!("Periodic check.");
386        self.check_signed_contracts().await?;
387        self.check_confirmed_contracts().await?;
388        self.check_preclosed_contracts().await?;
389
390        if check_channels {
391            self.channel_checks().await?;
392        }
393
394        Ok(())
395    }
396
397    async fn on_offer_message(
398        &self,
399        offered_message: &OfferDlc,
400        counter_party: PublicKey,
401    ) -> Result<(), Error> {
402        offered_message.validate(&self.secp, REFUND_DELAY, REFUND_DELAY * 2)?;
403        let keys_id = self
404            .signer_provider
405            .derive_signer_key_id(false, offered_message.temporary_contract_id);
406        let contract: OfferedContract =
407            OfferedContract::try_from_offer_dlc(offered_message, counter_party, keys_id)?;
408        contract.validate()?;
409
410        if self.store.get_contract(&contract.id).await?.is_some() {
411            return Err(Error::InvalidParameters(
412                "Contract with identical id already exists".to_string(),
413            ));
414        }
415
416        self.store.create_contract(&contract).await?;
417
418        Ok(())
419    }
420
421    async fn on_accept_message(
422        &self,
423        accept_msg: &AcceptDlc,
424        counter_party: &PublicKey,
425    ) -> Result<DlcMessage, Error> {
426        let offered_contract = get_contract_in_state!(
427            self,
428            &accept_msg.temporary_contract_id,
429            Offered,
430            Some(*counter_party)
431        )?;
432
433        let (signed_contract, signed_msg) = match verify_accepted_and_sign_contract(
434            &self.secp,
435            &offered_contract,
436            accept_msg,
437            &self.wallet,
438            &self.signer_provider,
439        ) {
440            Ok(contract) => contract,
441            Err(e) => {
442                return self
443                    .accept_fail_on_error(offered_contract, accept_msg.clone(), e)
444                    .await
445            }
446        };
447
448        self.wallet.import_address(&Address::p2wsh(
449            &signed_contract
450                .accepted_contract
451                .dlc_transactions
452                .funding_script_pubkey,
453            self.blockchain.get_network()?,
454        ))?;
455
456        self.store
457            .update_contract(&Contract::Signed(signed_contract))
458            .await?;
459
460        Ok(DlcMessage::Sign(signed_msg))
461    }
462
463    async fn on_sign_message(
464        &self,
465        sign_message: &SignDlc,
466        peer_id: &PublicKey,
467    ) -> Result<(), Error> {
468        let accepted_contract =
469            get_contract_in_state!(self, &sign_message.contract_id, Accepted, Some(*peer_id))?;
470
471        let (signed_contract, fund_tx) = match crate::contract_updater::verify_signed_contract(
472            &self.secp,
473            &accepted_contract,
474            sign_message,
475            &self.wallet,
476        ) {
477            Ok(contract) => contract,
478            Err(e) => {
479                return self
480                    .sign_fail_on_error(accepted_contract, sign_message.clone(), e)
481                    .await
482            }
483        };
484
485        self.store
486            .update_contract(&Contract::Signed(signed_contract))
487            .await?;
488
489        self.blockchain.send_transaction(&fund_tx).await?;
490
491        Ok(())
492    }
493
494    async fn get_oracle_announcements(
495        &self,
496        oracle_inputs: &OracleInput,
497    ) -> Result<Vec<OracleAnnouncement>, Error> {
498        let mut announcements = Vec::new();
499        for pubkey in &oracle_inputs.public_keys {
500            let oracle = self
501                .oracles
502                .get(pubkey)
503                .ok_or_else(|| Error::InvalidParameters("Unknown oracle public key".to_string()))?;
504            let announcement = oracle.get_announcement(&oracle_inputs.event_id).await?;
505            announcements.push(announcement);
506        }
507
508        Ok(announcements)
509    }
510
511    async fn sign_fail_on_error<R>(
512        &self,
513        accepted_contract: AcceptedContract,
514        sign_message: SignDlc,
515        e: Error,
516    ) -> Result<R, Error> {
517        tracing::error!("Error in on_sign {}", e);
518        self.store
519            .update_contract(&Contract::FailedSign(FailedSignContract {
520                accepted_contract,
521                sign_message,
522                error_message: e.to_string(),
523            }))
524            .await?;
525        Err(e)
526    }
527
528    async fn accept_fail_on_error<R>(
529        &self,
530        offered_contract: OfferedContract,
531        accept_message: AcceptDlc,
532        e: Error,
533    ) -> Result<R, Error> {
534        tracing::error!("Error in on_accept {}", e);
535        self.store
536            .update_contract(&Contract::FailedAccept(FailedAcceptContract {
537                offered_contract,
538                accept_message,
539                error_message: e.to_string(),
540            }))
541            .await?;
542        Err(e)
543    }
544
545    async fn check_signed_contract(&self, contract: &SignedContract) -> Result<(), Error> {
546        let confirmations = self
547            .blockchain
548            .get_transaction_confirmations(
549                &contract
550                    .accepted_contract
551                    .dlc_transactions
552                    .fund
553                    .compute_txid(),
554            )
555            .await?;
556        if confirmations >= NB_CONFIRMATIONS {
557            tracing::info!(
558                confirmations,
559                contract_id = contract.accepted_contract.get_contract_id_string(),
560                "Marking contract as confirmed."
561            );
562            self.store
563                .update_contract(&Contract::Confirmed(contract.clone()))
564                .await?;
565        } else {
566            tracing::info!(
567                confirmations,
568                required = NB_CONFIRMATIONS,
569                contract_id = contract.accepted_contract.get_contract_id_string(),
570                "Not enough confirmations to mark contract as confirmed."
571            );
572        }
573        Ok(())
574    }
575
576    async fn check_signed_contracts(&self) -> Result<(), Error> {
577        for c in self.store.get_signed_contracts().await? {
578            if let Err(e) = self.check_signed_contract(&c).await {
579                tracing::error!(
580                    "Error checking confirmed contract {}: {}",
581                    c.accepted_contract.get_contract_id_string(),
582                    e
583                )
584            }
585        }
586
587        Ok(())
588    }
589
590    async fn check_confirmed_contracts(&self) -> Result<(), Error> {
591        for c in self.store.get_confirmed_contracts().await? {
592            // Confirmed contracts from channel are processed in channel specific methods.
593            if c.channel_id.is_some() {
594                continue;
595            }
596            if let Err(e) = self.check_confirmed_contract(&c).await {
597                tracing::error!(
598                    "Error checking confirmed contract {}: {}",
599                    c.accepted_contract.get_contract_id_string(),
600                    e
601                )
602            }
603        }
604
605        Ok(())
606    }
607
608    async fn get_closable_contract_info<'a>(
609        &'a self,
610        contract: &'a SignedContract,
611    ) -> ClosableContractInfo<'a> {
612        let contract_infos = &contract.accepted_contract.offered_contract.contract_info;
613        let adaptor_infos = &contract.accepted_contract.adaptor_infos;
614        for (contract_info, adaptor_info) in contract_infos.iter().zip(adaptor_infos.iter()) {
615            let matured: Vec<_> = contract_info
616                .oracle_announcements
617                .iter()
618                .filter(|x| {
619                    (x.oracle_event.event_maturity_epoch as u64) <= self.time.unix_time_now()
620                })
621                .enumerate()
622                .collect();
623            if matured.len() >= contract_info.threshold {
624                let attestations = stream::iter(matured.iter())
625                    .map(|(i, announcement)| async move {
626                        // First try to get the oracle
627                        let oracle = match self.oracles.get(&announcement.oracle_public_key) {
628                            Some(oracle) => oracle,
629                            None => {
630                                tracing::debug!(
631                                    "Oracle not found for key: {}",
632                                    announcement.oracle_public_key
633                                );
634                                return None;
635                            }
636                        };
637
638                        // Then try to get the attestation
639                        let attestation = match oracle
640                            .get_attestation(&announcement.oracle_event.event_id)
641                            .await
642                        {
643                            Ok(attestation) => attestation,
644                            Err(e) => {
645                                tracing::error!(
646                                    "Attestation not found for event. id={} error={}",
647                                    announcement.oracle_event.event_id,
648                                    e.to_string()
649                                );
650                                return None;
651                            }
652                        };
653
654                        // Validate the attestation
655                        if let Err(e) = attestation.validate(&self.secp, announcement) {
656                            tracing::error!(
657                                "Oracle attestation is not valid. pubkey={} event_id={}, error={:?}",
658                                announcement.oracle_public_key,
659                                announcement.oracle_event.event_id,
660                                e
661                            );
662                            return None;
663                        }
664
665                        Some((*i, attestation))
666                    })
667                    .collect::<FuturesUnordered<_>>()
668                    .await
669                    .filter_map(|result| async move { result }) // Filter out None values
670                    .collect::<Vec<_>>()
671                    .await;
672                if attestations.len() >= contract_info.threshold {
673                    return Some((contract_info, adaptor_info, attestations));
674                }
675            }
676        }
677        None
678    }
679
680    async fn check_confirmed_contract(&self, contract: &SignedContract) -> Result<(), Error> {
681        let closable_contract_info = self.get_closable_contract_info(contract).await;
682        if let Some((contract_info, adaptor_info, attestations)) = closable_contract_info {
683            let offer = &contract.accepted_contract.offered_contract;
684            let signer = self.signer_provider.derive_contract_signer(offer.keys_id)?;
685
686            //  === WARNING ===
687            // This code could potentially be problematic. When running refund tests, it would look for a CET
688            // but the CET would be invalid and refund would not pass. By only updating with a valid CET,
689            // we then go to update. This way if it fails we can check for refund instead of bailing and getting locked
690            // funds.
691            if let Ok(cet) = crate::contract_updater::get_signed_cet(
692                &self.secp,
693                contract,
694                contract_info,
695                adaptor_info,
696                &attestations,
697                &signer,
698            ) {
699                match self
700                    .close_contract(
701                        contract,
702                        cet,
703                        attestations.iter().map(|x| x.1.clone()).collect(),
704                    )
705                    .await
706                {
707                    Ok(closed_contract) => {
708                        self.store.update_contract(&closed_contract).await?;
709                        return Ok(());
710                    }
711                    Err(e) => {
712                        tracing::warn!(
713                            "Failed to close contract {}: {}",
714                            contract.accepted_contract.get_contract_id_string(),
715                            e
716                        );
717                        return Err(e);
718                    }
719                }
720            }
721        }
722
723        self.check_refund(contract).await?;
724
725        Ok(())
726    }
727
728    /// Manually close a contract with the oracle attestations.
729    pub async fn close_confirmed_contract(
730        &self,
731        contract_id: &ContractId,
732        attestations: Vec<(usize, OracleAttestation)>,
733    ) -> Result<Contract, Error> {
734        let contract = get_contract_in_state!(self, contract_id, Confirmed, None::<PublicKey>)?;
735        let contract_infos = &contract.accepted_contract.offered_contract.contract_info;
736        let adaptor_infos = &contract.accepted_contract.adaptor_infos;
737
738        // find the contract info that matches the attestations
739        if let Some((contract_info, adaptor_info)) =
740            contract_infos.iter().zip(adaptor_infos).find(|(c, _)| {
741                let matches = attestations
742                    .iter()
743                    .filter(|(i, a)| {
744                        c.oracle_announcements[*i].oracle_event.oracle_nonces == a.nonces()
745                    })
746                    .count();
747
748                matches >= c.threshold
749            })
750        {
751            let offer = &contract.accepted_contract.offered_contract;
752            let signer = self.signer_provider.derive_contract_signer(offer.keys_id)?;
753            let cet = crate::contract_updater::get_signed_cet(
754                &self.secp,
755                &contract,
756                contract_info,
757                adaptor_info,
758                &attestations,
759                &signer,
760            )?;
761
762            // Check that the lock time has passed
763            let time = bitcoin::absolute::Time::from_consensus(self.time.unix_time_now() as u32)
764                .expect("Time is not in valid range. This should never happen.");
765            let height =
766                Height::from_consensus(self.blockchain.get_blockchain_height().await? as u32)
767                    .expect("Height is not in valid range. This should never happen.");
768            let locktime = cet.lock_time;
769
770            if !locktime.is_satisfied_by(height, time) {
771                return Err(Error::InvalidState(
772                    "CET lock time has not passed yet".to_string(),
773                ));
774            }
775
776            match self
777                .close_contract(
778                    &contract,
779                    cet,
780                    attestations.into_iter().map(|x| x.1).collect(),
781                )
782                .await
783            {
784                Ok(closed_contract) => {
785                    self.store.update_contract(&closed_contract).await?;
786                    Ok(closed_contract)
787                }
788                Err(e) => {
789                    tracing::warn!(
790                        "Failed to close contract {}: {e}",
791                        contract.accepted_contract.get_contract_id_string()
792                    );
793                    Err(e)
794                }
795            }
796        } else {
797            Err(Error::InvalidState(
798                "Attestations did not match contract infos".to_string(),
799            ))
800        }
801    }
802
803    async fn check_preclosed_contracts(&self) -> Result<(), Error> {
804        for c in self.store.get_preclosed_contracts().await? {
805            if let Err(e) = self.check_preclosed_contract(&c).await {
806                tracing::error!(
807                    "Error checking pre-closed contract {}: {}",
808                    c.signed_contract.accepted_contract.get_contract_id_string(),
809                    e
810                )
811            }
812        }
813
814        Ok(())
815    }
816
817    async fn check_preclosed_contract(&self, contract: &PreClosedContract) -> Result<(), Error> {
818        let broadcasted_txid = contract.signed_cet.compute_txid();
819        let confirmations = self
820            .blockchain
821            .get_transaction_confirmations(&broadcasted_txid)
822            .await?;
823        if confirmations >= NB_CONFIRMATIONS {
824            let closed_contract = ClosedContract {
825                attestations: contract.attestations.clone(),
826                signed_cet: Some(contract.signed_cet.clone()),
827                contract_id: contract.signed_contract.accepted_contract.get_contract_id(),
828                temporary_contract_id: contract
829                    .signed_contract
830                    .accepted_contract
831                    .offered_contract
832                    .id,
833                counter_party_id: contract
834                    .signed_contract
835                    .accepted_contract
836                    .offered_contract
837                    .counter_party,
838                pnl: contract
839                    .signed_contract
840                    .accepted_contract
841                    .compute_pnl(&contract.signed_cet),
842            };
843            self.store
844                .update_contract(&Contract::Closed(closed_contract))
845                .await?;
846        }
847
848        Ok(())
849    }
850
851    async fn close_contract(
852        &self,
853        contract: &SignedContract,
854        signed_cet: Transaction,
855        attestations: Vec<OracleAttestation>,
856    ) -> Result<Contract, Error> {
857        let confirmations = self
858            .blockchain
859            .get_transaction_confirmations(&signed_cet.compute_txid())
860            .await?;
861
862        if confirmations < 1 {
863            tracing::info!(
864                txid = signed_cet.compute_txid().to_string(),
865                "Broadcasting signed CET."
866            );
867            // TODO(tibo): if this fails because another tx is already in
868            // mempool or blockchain, we might have been cheated. There is
869            // not much to be done apart from possibly extracting a fraud
870            // proof but ideally it should be handled.
871            self.blockchain.send_transaction(&signed_cet).await?;
872
873            let preclosed_contract = PreClosedContract {
874                signed_contract: contract.clone(),
875                attestations: Some(attestations),
876                signed_cet,
877            };
878
879            return Ok(Contract::PreClosed(preclosed_contract));
880        } else if confirmations < NB_CONFIRMATIONS {
881            let preclosed_contract = PreClosedContract {
882                signed_contract: contract.clone(),
883                attestations: Some(attestations),
884                signed_cet,
885            };
886
887            return Ok(Contract::PreClosed(preclosed_contract));
888        }
889
890        let closed_contract = ClosedContract {
891            attestations: Some(attestations.to_vec()),
892            pnl: contract.accepted_contract.compute_pnl(&signed_cet),
893            signed_cet: Some(signed_cet),
894            contract_id: contract.accepted_contract.get_contract_id(),
895            temporary_contract_id: contract.accepted_contract.offered_contract.id,
896            counter_party_id: contract.accepted_contract.offered_contract.counter_party,
897        };
898
899        Ok(Contract::Closed(closed_contract))
900    }
901
902    async fn check_refund(&self, contract: &SignedContract) -> Result<(), Error> {
903        // TODO(tibo): should check for confirmation of refund before updating state
904        if contract
905            .accepted_contract
906            .dlc_transactions
907            .refund
908            .lock_time
909            .to_consensus_u32() as u64
910            <= self.time.unix_time_now()
911        {
912            let accepted_contract = &contract.accepted_contract;
913            let refund = accepted_contract.dlc_transactions.refund.clone();
914            let confirmations = self
915                .blockchain
916                .get_transaction_confirmations(&refund.compute_txid())
917                .await?;
918            if confirmations == 0 {
919                let offer = &contract.accepted_contract.offered_contract;
920                let signer = self.signer_provider.derive_contract_signer(offer.keys_id)?;
921                let refund =
922                    crate::contract_updater::get_signed_refund(&self.secp, contract, &signer)?;
923                self.blockchain.send_transaction(&refund).await?;
924            }
925
926            self.store
927                .update_contract(&Contract::Refunded(contract.clone()))
928                .await?;
929        }
930
931        Ok(())
932    }
933
934    /// Function to call when we detect that a contract was closed by our counter party.
935    /// This will update the state of the contract and return the [`Contract`] object.
936    pub async fn on_counterparty_close(
937        &mut self,
938        contract: &SignedContract,
939        closing_tx: Transaction,
940        confirmations: u32,
941    ) -> Result<Contract, Error> {
942        // check if the closing tx actually spends the funding output
943        if !closing_tx.input.iter().any(|i| {
944            i.previous_output
945                == contract
946                    .accepted_contract
947                    .dlc_transactions
948                    .get_fund_outpoint()
949        }) {
950            return Err(Error::InvalidParameters(
951                "Closing tx does not spend the funding tx".to_string(),
952            ));
953        }
954
955        // check if it is the refund tx (easy case)
956        if contract
957            .accepted_contract
958            .dlc_transactions
959            .refund
960            .compute_txid()
961            == closing_tx.compute_txid()
962        {
963            let refunded = Contract::Refunded(contract.clone());
964            self.store.update_contract(&refunded).await?;
965            return Ok(refunded);
966        }
967
968        let contract = if confirmations < NB_CONFIRMATIONS {
969            Contract::PreClosed(PreClosedContract {
970                signed_contract: contract.clone(),
971                attestations: None, // todo in some cases we can get the attestations from the closing tx
972                signed_cet: closing_tx,
973            })
974        } else {
975            Contract::Closed(ClosedContract {
976                attestations: None, // todo in some cases we can get the attestations from the closing tx
977                pnl: contract.accepted_contract.compute_pnl(&closing_tx),
978                signed_cet: Some(closing_tx),
979                contract_id: contract.accepted_contract.get_contract_id(),
980                temporary_contract_id: contract.accepted_contract.offered_contract.id,
981                counter_party_id: contract.accepted_contract.offered_contract.counter_party,
982            })
983        };
984
985        self.store.update_contract(&contract).await?;
986
987        Ok(contract)
988    }
989}
990
991impl<W: Deref, SP: Deref, B: Deref, S: Deref, O: Deref, T: Deref, F: Deref, X: ContractSigner>
992    Manager<W, Arc<CachedContractSignerProvider<SP, X>>, B, S, O, T, F, X>
993where
994    W::Target: Wallet,
995    SP::Target: ContractSignerProvider<Signer = X>,
996    B::Target: Blockchain,
997    S::Target: Storage,
998    O::Target: Oracle,
999    T::Target: Time,
1000    F::Target: FeeEstimator,
1001{
1002    /// Create a new channel offer and return the [`dlc_messages::channel::OfferChannel`]
1003    /// message to be sent to the `counter_party`.
1004    pub async fn offer_channel(
1005        &self,
1006        contract_input: &ContractInput,
1007        counter_party: PublicKey,
1008    ) -> Result<OfferChannel, Error> {
1009        let oracle_announcements = self.oracle_announcements(contract_input).await?;
1010
1011        let (offered_channel, offered_contract) = crate::channel_updater::offer_channel(
1012            &self.secp,
1013            contract_input,
1014            &counter_party,
1015            &oracle_announcements,
1016            CET_NSEQUENCE,
1017            REFUND_DELAY,
1018            &self.wallet,
1019            &self.signer_provider,
1020            &self.blockchain,
1021            &self.time,
1022            crate::utils::get_new_temporary_id(),
1023        )
1024        .await?;
1025
1026        let msg = offered_channel.get_offer_channel_msg(&offered_contract);
1027
1028        self.store
1029            .upsert_channel(
1030                Channel::Offered(offered_channel),
1031                Some(Contract::Offered(offered_contract)),
1032            )
1033            .await?;
1034
1035        Ok(msg)
1036    }
1037
1038    /// Reject a channel that was offered. Returns the [`dlc_messages::channel::Reject`]
1039    /// message to be sent as well as the public key of the offering node.
1040    pub async fn reject_channel(
1041        &self,
1042        channel_id: &ChannelId,
1043    ) -> Result<(Reject, PublicKey), Error> {
1044        let offered_channel =
1045            get_channel_in_state!(self, channel_id, Offered, None as Option<PublicKey>)?;
1046
1047        if offered_channel.is_offer_party {
1048            return Err(Error::InvalidState(
1049                "Cannot reject channel initiated by us.".to_string(),
1050            ));
1051        }
1052
1053        let offered_contract = get_contract_in_state!(
1054            self,
1055            &offered_channel.offered_contract_id,
1056            Offered,
1057            None as Option<PublicKey>
1058        )?;
1059
1060        let counterparty = offered_channel.counter_party;
1061        self.store
1062            .upsert_channel(
1063                Channel::Cancelled(offered_channel),
1064                Some(Contract::Rejected(offered_contract)),
1065            )
1066            .await?;
1067
1068        let msg = Reject {
1069            channel_id: *channel_id,
1070        };
1071        Ok((msg, counterparty))
1072    }
1073
1074    /// Accept a channel that was offered. Returns the [`dlc_messages::channel::AcceptChannel`]
1075    /// message to be sent, the updated [`crate::ChannelId`] and [`crate::ContractId`],
1076    /// as well as the public key of the offering node.
1077    pub async fn accept_channel(
1078        &self,
1079        channel_id: &ChannelId,
1080    ) -> Result<(AcceptChannel, ChannelId, ContractId, PublicKey), Error> {
1081        let offered_channel =
1082            get_channel_in_state!(self, channel_id, Offered, None as Option<PublicKey>)?;
1083
1084        if offered_channel.is_offer_party {
1085            return Err(Error::InvalidState(
1086                "Cannot accept channel initiated by us.".to_string(),
1087            ));
1088        }
1089
1090        let offered_contract = get_contract_in_state!(
1091            self,
1092            &offered_channel.offered_contract_id,
1093            Offered,
1094            None as Option<PublicKey>
1095        )?;
1096
1097        let (accepted_channel, accepted_contract, accept_channel) =
1098            crate::channel_updater::accept_channel_offer(
1099                &self.secp,
1100                &offered_channel,
1101                &offered_contract,
1102                &self.wallet,
1103                &self.signer_provider,
1104                &self.blockchain,
1105            )
1106            .await?;
1107
1108        self.wallet.import_address(&Address::p2wsh(
1109            &accepted_contract.dlc_transactions.funding_script_pubkey,
1110            self.blockchain.get_network()?,
1111        ))?;
1112
1113        let channel_id = accepted_channel.channel_id;
1114        let contract_id = accepted_contract.get_contract_id();
1115        let counter_party = accepted_contract.offered_contract.counter_party;
1116
1117        self.store
1118            .upsert_channel(
1119                Channel::Accepted(accepted_channel),
1120                Some(Contract::Accepted(accepted_contract)),
1121            )
1122            .await?;
1123
1124        Ok((accept_channel, channel_id, contract_id, counter_party))
1125    }
1126
1127    /// Force close the channel with given [`crate::ChannelId`].
1128    pub async fn force_close_channel(&self, channel_id: &ChannelId) -> Result<(), Error> {
1129        let channel = get_channel_in_state!(self, channel_id, Signed, None as Option<PublicKey>)?;
1130
1131        self.force_close_channel_internal(channel, true).await
1132    }
1133
1134    /// Offer to settle the balance of a channel so that the counter party gets
1135    /// `counter_payout`. Returns the [`dlc_messages::channel::SettleChannelOffer`]
1136    /// message to be sent and the public key of the counter party node.
1137    pub async fn settle_offer(
1138        &self,
1139        channel_id: &ChannelId,
1140        counter_payout: u64,
1141    ) -> Result<(SettleOffer, PublicKey), Error> {
1142        let mut signed_channel =
1143            get_channel_in_state!(self, channel_id, Signed, None as Option<PublicKey>)?;
1144
1145        let msg = crate::channel_updater::settle_channel_offer(
1146            &self.secp,
1147            &mut signed_channel,
1148            counter_payout,
1149            PEER_TIMEOUT,
1150            &self.signer_provider,
1151            &self.time,
1152        )?;
1153
1154        let counter_party = signed_channel.counter_party;
1155
1156        self.store
1157            .upsert_channel(Channel::Signed(signed_channel), None)
1158            .await?;
1159
1160        Ok((msg, counter_party))
1161    }
1162
1163    /// Accept a settlement offer, returning the [`SettleAccept`] message to be
1164    /// sent to the node with the returned [`PublicKey`] id.
1165    pub async fn accept_settle_offer(
1166        &self,
1167        channel_id: &ChannelId,
1168    ) -> Result<(SettleAccept, PublicKey), Error> {
1169        let mut signed_channel =
1170            get_channel_in_state!(self, channel_id, Signed, None as Option<PublicKey>)?;
1171
1172        let msg = crate::channel_updater::settle_channel_accept(
1173            &self.secp,
1174            &mut signed_channel,
1175            CET_NSEQUENCE,
1176            0,
1177            PEER_TIMEOUT,
1178            &self.signer_provider,
1179            &self.time,
1180            &self.chain_monitor,
1181        )
1182        .await?;
1183
1184        let counter_party = signed_channel.counter_party;
1185
1186        self.store
1187            .upsert_channel(Channel::Signed(signed_channel), None)
1188            .await?;
1189
1190        Ok((msg, counter_party))
1191    }
1192
1193    /// Returns a [`RenewOffer`] message as well as the [`PublicKey`] of the
1194    /// counter party's node to offer the establishment of a new contract in the
1195    /// channel.
1196    pub async fn renew_offer(
1197        &self,
1198        channel_id: &ChannelId,
1199        counter_payout: u64,
1200        contract_input: &ContractInput,
1201    ) -> Result<(RenewOffer, PublicKey), Error> {
1202        let mut signed_channel =
1203            get_channel_in_state!(self, channel_id, Signed, None as Option<PublicKey>)?;
1204
1205        let oracle_announcements = self.oracle_announcements(contract_input).await?;
1206
1207        let (msg, offered_contract) = crate::channel_updater::renew_offer(
1208            &self.secp,
1209            &mut signed_channel,
1210            contract_input,
1211            oracle_announcements,
1212            counter_payout,
1213            REFUND_DELAY,
1214            PEER_TIMEOUT,
1215            CET_NSEQUENCE,
1216            &self.signer_provider,
1217            &self.time,
1218        )?;
1219
1220        let counter_party = offered_contract.counter_party;
1221
1222        self.store
1223            .upsert_channel(
1224                Channel::Signed(signed_channel),
1225                Some(Contract::Offered(offered_contract)),
1226            )
1227            .await?;
1228
1229        Ok((msg, counter_party))
1230    }
1231
1232    /// Accept an offer to renew the contract in the channel. Returns the
1233    /// [`RenewAccept`] message to be sent to the peer with the returned
1234    /// [`PublicKey`] as node id.
1235    pub async fn accept_renew_offer(
1236        &self,
1237        channel_id: &ChannelId,
1238    ) -> Result<(RenewAccept, PublicKey), Error> {
1239        let mut signed_channel =
1240            get_channel_in_state!(self, channel_id, Signed, None as Option<PublicKey>)?;
1241        let offered_contract_id = signed_channel.get_contract_id().ok_or_else(|| {
1242            Error::InvalidState("Expected to have a contract id but did not.".to_string())
1243        })?;
1244
1245        let offered_contract = get_contract_in_state!(
1246            self,
1247            &offered_contract_id,
1248            Offered,
1249            None as Option<PublicKey>
1250        )?;
1251
1252        let (accepted_contract, msg) = crate::channel_updater::accept_channel_renewal(
1253            &self.secp,
1254            &mut signed_channel,
1255            &offered_contract,
1256            CET_NSEQUENCE,
1257            PEER_TIMEOUT,
1258            &self.signer_provider,
1259            &self.time,
1260        )?;
1261
1262        let counter_party = signed_channel.counter_party;
1263
1264        self.store
1265            .upsert_channel(
1266                Channel::Signed(signed_channel),
1267                Some(Contract::Accepted(accepted_contract)),
1268            )
1269            .await?;
1270
1271        Ok((msg, counter_party))
1272    }
1273
1274    /// Reject an offer to renew the contract in the channel. Returns the
1275    /// [`Reject`] message to be sent to the peer with the returned
1276    /// [`PublicKey`] node id.
1277    pub async fn reject_renew_offer(
1278        &self,
1279        channel_id: &ChannelId,
1280    ) -> Result<(Reject, PublicKey), Error> {
1281        let mut signed_channel =
1282            get_channel_in_state!(self, channel_id, Signed, None as Option<PublicKey>)?;
1283        let offered_contract_id = signed_channel.get_contract_id().ok_or_else(|| {
1284            Error::InvalidState(
1285                "Expected to be in a state with an associated contract id but was not.".to_string(),
1286            )
1287        })?;
1288
1289        let offered_contract = get_contract_in_state!(
1290            self,
1291            &offered_contract_id,
1292            Offered,
1293            None as Option<PublicKey>
1294        )?;
1295
1296        let reject_msg = crate::channel_updater::reject_renew_offer(&mut signed_channel)?;
1297
1298        let counter_party = signed_channel.counter_party;
1299
1300        self.store
1301            .upsert_channel(
1302                Channel::Signed(signed_channel),
1303                Some(Contract::Rejected(offered_contract)),
1304            )
1305            .await?;
1306
1307        Ok((reject_msg, counter_party))
1308    }
1309
1310    /// Returns a [`Reject`] message to be sent to the counter party of the
1311    /// channel to inform them that the local party does not wish to accept the
1312    /// proposed settle offer.
1313    pub async fn reject_settle_offer(
1314        &self,
1315        channel_id: &ChannelId,
1316    ) -> Result<(Reject, PublicKey), Error> {
1317        let mut signed_channel =
1318            get_channel_in_state!(self, channel_id, Signed, None as Option<PublicKey>)?;
1319
1320        let msg = crate::channel_updater::reject_settle_offer(&mut signed_channel)?;
1321
1322        let counter_party = signed_channel.counter_party;
1323
1324        self.store
1325            .upsert_channel(Channel::Signed(signed_channel), None)
1326            .await?;
1327
1328        Ok((msg, counter_party))
1329    }
1330
1331    /// Returns a [`CollaborativeCloseOffer`] message to be sent to the counter
1332    /// party of the channel and update the state of the channel. Note that the
1333    /// channel will be forced closed after a timeout if the counter party does
1334    /// not broadcast the close transaction.
1335    pub async fn offer_collaborative_close(
1336        &self,
1337        channel_id: &ChannelId,
1338        counter_payout: u64,
1339    ) -> Result<CollaborativeCloseOffer, Error> {
1340        let mut signed_channel =
1341            get_channel_in_state!(self, channel_id, Signed, None as Option<PublicKey>)?;
1342
1343        let (msg, close_tx) = crate::channel_updater::offer_collaborative_close(
1344            &self.secp,
1345            &mut signed_channel,
1346            counter_payout,
1347            &self.signer_provider,
1348            &self.time,
1349        )?;
1350
1351        self.chain_monitor.lock().await.add_tx(
1352            close_tx.compute_txid(),
1353            ChannelInfo {
1354                channel_id: *channel_id,
1355                tx_type: TxType::CollaborativeClose,
1356            },
1357        );
1358
1359        self.store
1360            .upsert_channel(Channel::Signed(signed_channel), None)
1361            .await?;
1362        self.store
1363            .persist_chain_monitor(&*self.chain_monitor.lock().await)
1364            .await?;
1365
1366        Ok(msg)
1367    }
1368
1369    /// Accept an offer to collaboratively close the channel. The close transaction
1370    /// will be broadcast and the state of the channel updated.
1371    pub async fn accept_collaborative_close(&self, channel_id: &ChannelId) -> Result<(), Error> {
1372        let signed_channel =
1373            get_channel_in_state!(self, channel_id, Signed, None as Option<PublicKey>)?;
1374
1375        let closed_contract = if let Some(SignedChannelState::Established {
1376            signed_contract_id,
1377            ..
1378        }) = &signed_channel.roll_back_state
1379        {
1380            let counter_payout = get_signed_channel_state!(
1381                signed_channel,
1382                CollaborativeCloseOffered,
1383                counter_payout
1384            )?;
1385            Some(
1386                self.get_collaboratively_closed_contract(signed_contract_id, *counter_payout, true)
1387                    .await?,
1388            )
1389        } else {
1390            None
1391        };
1392
1393        let (close_tx, closed_channel) = crate::channel_updater::accept_collaborative_close_offer(
1394            &self.secp,
1395            &signed_channel,
1396            &self.signer_provider,
1397        )?;
1398
1399        self.blockchain.send_transaction(&close_tx).await?;
1400
1401        self.store.upsert_channel(closed_channel, None).await?;
1402
1403        if let Some(closed_contract) = closed_contract {
1404            self.store
1405                .update_contract(&Contract::Closed(closed_contract))
1406                .await?;
1407        }
1408
1409        Ok(())
1410    }
1411
1412    async fn try_finalize_closing_established_channel(
1413        &self,
1414        signed_channel: SignedChannel,
1415    ) -> Result<(), Error> {
1416        let (buffer_tx, contract_id, &is_initiator) = get_signed_channel_state!(
1417            signed_channel,
1418            Closing,
1419            buffer_transaction,
1420            contract_id,
1421            is_initiator
1422        )?;
1423
1424        if self
1425            .blockchain
1426            .get_transaction_confirmations(&buffer_tx.compute_txid())
1427            .await?
1428            >= CET_NSEQUENCE
1429        {
1430            tracing::info!(
1431                "Buffer transaction for contract {} has enough confirmations to spend from it",
1432                serialize_hex(&contract_id)
1433            );
1434
1435            let confirmed_contract =
1436                get_contract_in_state!(self, &contract_id, Confirmed, None as Option<PublicKey>)?;
1437
1438            let (contract_info, adaptor_info, attestations) = self
1439                .get_closable_contract_info(&confirmed_contract)
1440                .await
1441                .ok_or_else(|| {
1442                    Error::InvalidState("Could not get information to close contract".to_string())
1443                })?;
1444
1445            let (signed_cet, closed_channel) =
1446                crate::channel_updater::finalize_unilateral_close_settled_channel(
1447                    &self.secp,
1448                    &signed_channel,
1449                    &confirmed_contract,
1450                    contract_info,
1451                    &attestations,
1452                    adaptor_info,
1453                    &self.signer_provider,
1454                    is_initiator,
1455                )?;
1456
1457            let closed_contract = self
1458                .close_contract(
1459                    &confirmed_contract,
1460                    signed_cet,
1461                    attestations.iter().map(|x| &x.1).cloned().collect(),
1462                )
1463                .await?;
1464
1465            self.chain_monitor
1466                .lock()
1467                .await
1468                .cleanup_channel(signed_channel.channel_id);
1469
1470            self.store
1471                .upsert_channel(closed_channel, Some(closed_contract))
1472                .await?;
1473        }
1474
1475        Ok(())
1476    }
1477
1478    async fn on_offer_channel(
1479        &self,
1480        offer_channel: &OfferChannel,
1481        counter_party: PublicKey,
1482    ) -> Result<(), Error> {
1483        offer_channel.validate(
1484            &self.secp,
1485            REFUND_DELAY,
1486            REFUND_DELAY * 2,
1487            CET_NSEQUENCE,
1488            CET_NSEQUENCE * 2,
1489        )?;
1490
1491        let keys_id = self
1492            .signer_provider
1493            .derive_signer_key_id(false, offer_channel.temporary_contract_id);
1494        let (channel, contract) =
1495            OfferedChannel::from_offer_channel(offer_channel, counter_party, keys_id)?;
1496
1497        contract.validate()?;
1498
1499        if self
1500            .store
1501            .get_channel(&channel.temporary_channel_id)
1502            .await?
1503            .is_some()
1504        {
1505            return Err(Error::InvalidParameters(
1506                "Channel with identical idea already in store".to_string(),
1507            ));
1508        }
1509
1510        self.store
1511            .upsert_channel(Channel::Offered(channel), Some(Contract::Offered(contract)))
1512            .await?;
1513
1514        Ok(())
1515    }
1516
1517    async fn on_accept_channel(
1518        &self,
1519        accept_channel: &AcceptChannel,
1520        peer_id: &PublicKey,
1521    ) -> Result<SignChannel, Error> {
1522        let offered_channel = get_channel_in_state!(
1523            self,
1524            &accept_channel.temporary_channel_id,
1525            Offered,
1526            Some(*peer_id)
1527        )?;
1528        let offered_contract = get_contract_in_state!(
1529            self,
1530            &offered_channel.offered_contract_id,
1531            Offered,
1532            Some(*peer_id)
1533        )?;
1534
1535        let (signed_channel, signed_contract, sign_channel) = {
1536            let res = crate::channel_updater::verify_and_sign_accepted_channel(
1537                &self.secp,
1538                &offered_channel,
1539                &offered_contract,
1540                accept_channel,
1541                //TODO(tibo): this should be parameterizable.
1542                CET_NSEQUENCE,
1543                &self.wallet,
1544                &self.signer_provider,
1545                &self.chain_monitor,
1546            )
1547            .await;
1548
1549            match res {
1550                Ok(res) => res,
1551                Err(e) => {
1552                    let channel = crate::channel::FailedAccept {
1553                        temporary_channel_id: accept_channel.temporary_channel_id,
1554                        error_message: format!("Error validating accept channel: {}", e),
1555                        accept_message: accept_channel.clone(),
1556                        counter_party: *peer_id,
1557                    };
1558                    self.store
1559                        .upsert_channel(Channel::FailedAccept(channel), None)
1560                        .await?;
1561                    return Err(e);
1562                }
1563            }
1564        };
1565
1566        self.wallet.import_address(&Address::p2wsh(
1567            &signed_contract
1568                .accepted_contract
1569                .dlc_transactions
1570                .funding_script_pubkey,
1571            self.blockchain.get_network()?,
1572        ))?;
1573
1574        if let SignedChannelState::Established {
1575            buffer_transaction, ..
1576        } = &signed_channel.state
1577        {
1578            self.chain_monitor.lock().await.add_tx(
1579                buffer_transaction.compute_txid(),
1580                ChannelInfo {
1581                    channel_id: signed_channel.channel_id,
1582                    tx_type: TxType::BufferTx,
1583                },
1584            );
1585        } else {
1586            unreachable!();
1587        }
1588
1589        self.store
1590            .upsert_channel(
1591                Channel::Signed(signed_channel),
1592                Some(Contract::Signed(signed_contract)),
1593            )
1594            .await?;
1595
1596        self.store
1597            .persist_chain_monitor(&*self.chain_monitor.lock().await)
1598            .await?;
1599
1600        Ok(sign_channel)
1601    }
1602
1603    async fn on_sign_channel(
1604        &self,
1605        sign_channel: &SignChannel,
1606        peer_id: &PublicKey,
1607    ) -> Result<(), Error> {
1608        let accepted_channel =
1609            get_channel_in_state!(self, &sign_channel.channel_id, Accepted, Some(*peer_id))?;
1610        let accepted_contract = get_contract_in_state!(
1611            self,
1612            &accepted_channel.accepted_contract_id,
1613            Accepted,
1614            Some(*peer_id)
1615        )?;
1616
1617        let (signed_channel, signed_contract, signed_fund_tx) = {
1618            let res = verify_signed_channel(
1619                &self.secp,
1620                &accepted_channel,
1621                &accepted_contract,
1622                sign_channel,
1623                &self.wallet,
1624                &self.chain_monitor,
1625            )
1626            .await;
1627
1628            match res {
1629                Ok(res) => res,
1630                Err(e) => {
1631                    let channel = crate::channel::FailedSign {
1632                        channel_id: sign_channel.channel_id,
1633                        error_message: format!("Error validating accept channel: {}", e),
1634                        sign_message: sign_channel.clone(),
1635                        counter_party: *peer_id,
1636                    };
1637                    self.store
1638                        .upsert_channel(Channel::FailedSign(channel), None)
1639                        .await?;
1640                    return Err(e);
1641                }
1642            }
1643        };
1644
1645        if let SignedChannelState::Established {
1646            buffer_transaction, ..
1647        } = &signed_channel.state
1648        {
1649            self.chain_monitor.lock().await.add_tx(
1650                buffer_transaction.compute_txid(),
1651                ChannelInfo {
1652                    channel_id: signed_channel.channel_id,
1653                    tx_type: TxType::BufferTx,
1654                },
1655            );
1656        } else {
1657            unreachable!();
1658        }
1659
1660        self.blockchain.send_transaction(&signed_fund_tx).await?;
1661
1662        self.store
1663            .upsert_channel(
1664                Channel::Signed(signed_channel),
1665                Some(Contract::Signed(signed_contract)),
1666            )
1667            .await?;
1668        self.store
1669            .persist_chain_monitor(&*self.chain_monitor.lock().await)
1670            .await?;
1671
1672        Ok(())
1673    }
1674
1675    async fn on_settle_offer(
1676        &self,
1677        settle_offer: &SettleOffer,
1678        peer_id: &PublicKey,
1679    ) -> Result<Option<Reject>, Error> {
1680        let mut signed_channel =
1681            get_channel_in_state!(self, &settle_offer.channel_id, Signed, Some(*peer_id))?;
1682
1683        if let SignedChannelState::SettledOffered { .. } = signed_channel.state {
1684            return Ok(Some(Reject {
1685                channel_id: settle_offer.channel_id,
1686            }));
1687        }
1688
1689        crate::channel_updater::on_settle_offer(&mut signed_channel, settle_offer)?;
1690
1691        self.store
1692            .upsert_channel(Channel::Signed(signed_channel), None)
1693            .await?;
1694
1695        Ok(None)
1696    }
1697
1698    async fn on_settle_accept(
1699        &self,
1700        settle_accept: &SettleAccept,
1701        peer_id: &PublicKey,
1702    ) -> Result<SettleConfirm, Error> {
1703        let mut signed_channel =
1704            get_channel_in_state!(self, &settle_accept.channel_id, Signed, Some(*peer_id))?;
1705
1706        let msg = crate::channel_updater::settle_channel_confirm(
1707            &self.secp,
1708            &mut signed_channel,
1709            settle_accept,
1710            CET_NSEQUENCE,
1711            0,
1712            PEER_TIMEOUT,
1713            &self.signer_provider,
1714            &self.time,
1715            &self.chain_monitor,
1716        )
1717        .await?;
1718
1719        self.store
1720            .upsert_channel(Channel::Signed(signed_channel), None)
1721            .await?;
1722
1723        Ok(msg)
1724    }
1725
1726    async fn on_settle_confirm(
1727        &self,
1728        settle_confirm: &SettleConfirm,
1729        peer_id: &PublicKey,
1730    ) -> Result<SettleFinalize, Error> {
1731        let mut signed_channel =
1732            get_channel_in_state!(self, &settle_confirm.channel_id, Signed, Some(*peer_id))?;
1733        let &own_payout = get_signed_channel_state!(signed_channel, SettledAccepted, own_payout)?;
1734        let (prev_buffer_tx, own_buffer_adaptor_signature, is_offer, signed_contract_id) = get_signed_channel_rollback_state!(
1735            signed_channel,
1736            Established,
1737            buffer_transaction,
1738            own_buffer_adaptor_signature,
1739            is_offer,
1740            signed_contract_id
1741        )?;
1742
1743        let prev_buffer_txid = prev_buffer_tx.compute_txid();
1744        let own_buffer_adaptor_signature = *own_buffer_adaptor_signature;
1745        let is_offer = *is_offer;
1746        let signed_contract_id = *signed_contract_id;
1747
1748        let msg = crate::channel_updater::settle_channel_finalize(
1749            &self.secp,
1750            &mut signed_channel,
1751            settle_confirm,
1752            &self.signer_provider,
1753        )?;
1754
1755        self.chain_monitor.lock().await.add_tx(
1756            prev_buffer_txid,
1757            ChannelInfo {
1758                channel_id: signed_channel.channel_id,
1759                tx_type: TxType::Revoked {
1760                    update_idx: signed_channel.update_idx + 1,
1761                    own_adaptor_signature: own_buffer_adaptor_signature,
1762                    is_offer,
1763                    revoked_tx_type: RevokedTxType::Buffer,
1764                },
1765            },
1766        );
1767
1768        let closed_contract = Contract::Closed(
1769            self.get_collaboratively_closed_contract(&signed_contract_id, own_payout, true)
1770                .await?,
1771        );
1772
1773        self.store
1774            .upsert_channel(Channel::Signed(signed_channel), Some(closed_contract))
1775            .await?;
1776        self.store
1777            .persist_chain_monitor(&*self.chain_monitor.lock().await)
1778            .await?;
1779
1780        Ok(msg)
1781    }
1782
1783    async fn on_settle_finalize(
1784        &self,
1785        settle_finalize: &SettleFinalize,
1786        peer_id: &PublicKey,
1787    ) -> Result<(), Error> {
1788        let mut signed_channel =
1789            get_channel_in_state!(self, &settle_finalize.channel_id, Signed, Some(*peer_id))?;
1790        let &own_payout = get_signed_channel_state!(signed_channel, SettledConfirmed, own_payout)?;
1791        let (buffer_tx, own_buffer_adaptor_signature, is_offer, signed_contract_id) = get_signed_channel_rollback_state!(
1792            signed_channel,
1793            Established,
1794            buffer_transaction,
1795            own_buffer_adaptor_signature,
1796            is_offer,
1797            signed_contract_id
1798        )?;
1799
1800        let own_buffer_adaptor_signature = *own_buffer_adaptor_signature;
1801        let is_offer = *is_offer;
1802        let buffer_txid = buffer_tx.compute_txid();
1803        let signed_contract_id = *signed_contract_id;
1804
1805        crate::channel_updater::settle_channel_on_finalize(
1806            &self.secp,
1807            &mut signed_channel,
1808            settle_finalize,
1809        )?;
1810
1811        self.chain_monitor.lock().await.add_tx(
1812            buffer_txid,
1813            ChannelInfo {
1814                channel_id: signed_channel.channel_id,
1815                tx_type: TxType::Revoked {
1816                    update_idx: signed_channel.update_idx + 1,
1817                    own_adaptor_signature: own_buffer_adaptor_signature,
1818                    is_offer,
1819                    revoked_tx_type: RevokedTxType::Buffer,
1820                },
1821            },
1822        );
1823
1824        let closed_contract = Contract::Closed(
1825            self.get_collaboratively_closed_contract(&signed_contract_id, own_payout, true)
1826                .await?,
1827        );
1828        self.store
1829            .upsert_channel(Channel::Signed(signed_channel), Some(closed_contract))
1830            .await?;
1831        self.store
1832            .persist_chain_monitor(&*self.chain_monitor.lock().await)
1833            .await?;
1834
1835        Ok(())
1836    }
1837
1838    async fn on_renew_offer(
1839        &self,
1840        renew_offer: &RenewOffer,
1841        peer_id: &PublicKey,
1842    ) -> Result<Option<Reject>, Error> {
1843        let mut signed_channel =
1844            get_channel_in_state!(self, &renew_offer.channel_id, Signed, Some(*peer_id))?;
1845
1846        // Received a renew offer when we already sent one, we reject it.
1847        if let SignedChannelState::RenewOffered { is_offer, .. } = signed_channel.state {
1848            if is_offer {
1849                return Ok(Some(Reject {
1850                    channel_id: renew_offer.channel_id,
1851                }));
1852            }
1853        }
1854
1855        let offered_contract = crate::channel_updater::on_renew_offer(
1856            &mut signed_channel,
1857            renew_offer,
1858            PEER_TIMEOUT,
1859            &self.time,
1860        )?;
1861
1862        self.store.create_contract(&offered_contract).await?;
1863        self.store
1864            .upsert_channel(Channel::Signed(signed_channel), None)
1865            .await?;
1866
1867        Ok(None)
1868    }
1869
1870    async fn on_renew_accept(
1871        &self,
1872        renew_accept: &RenewAccept,
1873        peer_id: &PublicKey,
1874    ) -> Result<RenewConfirm, Error> {
1875        let mut signed_channel =
1876            get_channel_in_state!(self, &renew_accept.channel_id, Signed, Some(*peer_id))?;
1877        let offered_contract_id = signed_channel.get_contract_id().ok_or_else(|| {
1878            Error::InvalidState(
1879                "Expected to be in a state with an associated contract id but was not.".to_string(),
1880            )
1881        })?;
1882
1883        let offered_contract =
1884            get_contract_in_state!(self, &offered_contract_id, Offered, Some(*peer_id))?;
1885
1886        let (signed_contract, msg) = crate::channel_updater::verify_renew_accept_and_confirm(
1887            &self.secp,
1888            renew_accept,
1889            &mut signed_channel,
1890            &offered_contract,
1891            CET_NSEQUENCE,
1892            PEER_TIMEOUT,
1893            &self.wallet,
1894            &self.signer_provider,
1895            &self.time,
1896        )?;
1897
1898        // Directly confirmed as we're in a channel the fund tx is already confirmed.
1899        self.store
1900            .upsert_channel(
1901                Channel::Signed(signed_channel),
1902                Some(Contract::Confirmed(signed_contract)),
1903            )
1904            .await?;
1905
1906        Ok(msg)
1907    }
1908
1909    async fn on_renew_confirm(
1910        &self,
1911        renew_confirm: &RenewConfirm,
1912        peer_id: &PublicKey,
1913    ) -> Result<RenewFinalize, Error> {
1914        let mut signed_channel =
1915            get_channel_in_state!(self, &renew_confirm.channel_id, Signed, Some(*peer_id))?;
1916        let own_payout = get_signed_channel_state!(signed_channel, RenewAccepted, own_payout)?;
1917        let contract_id = signed_channel.get_contract_id().ok_or_else(|| {
1918            Error::InvalidState(
1919                "Expected to be in a state with an associated contract id but was not.".to_string(),
1920            )
1921        })?;
1922
1923        let (tx_type, prev_tx_id, closed_contract) = match signed_channel
1924            .roll_back_state
1925            .as_ref()
1926            .expect("to have a rollback state")
1927        {
1928            SignedChannelState::Established {
1929                own_buffer_adaptor_signature,
1930                buffer_transaction,
1931                signed_contract_id,
1932                ..
1933            } => {
1934                let closed_contract = Contract::Closed(
1935                    self.get_collaboratively_closed_contract(signed_contract_id, *own_payout, true)
1936                        .await?,
1937                );
1938                (
1939                    TxType::Revoked {
1940                        update_idx: signed_channel.update_idx,
1941                        own_adaptor_signature: *own_buffer_adaptor_signature,
1942                        is_offer: false,
1943                        revoked_tx_type: RevokedTxType::Buffer,
1944                    },
1945                    buffer_transaction.compute_txid(),
1946                    Some(closed_contract),
1947                )
1948            }
1949            SignedChannelState::Settled {
1950                settle_tx,
1951                own_settle_adaptor_signature,
1952                ..
1953            } => (
1954                TxType::Revoked {
1955                    update_idx: signed_channel.update_idx,
1956                    own_adaptor_signature: *own_settle_adaptor_signature,
1957                    is_offer: false,
1958                    revoked_tx_type: RevokedTxType::Settle,
1959                },
1960                settle_tx.compute_txid(),
1961                None,
1962            ),
1963            s => {
1964                return Err(Error::InvalidState(format!(
1965                    "Expected rollback state Established or Revoked but found {s:?}"
1966                )))
1967            }
1968        };
1969        let accepted_contract =
1970            get_contract_in_state!(self, &contract_id, Accepted, Some(*peer_id))?;
1971
1972        let (signed_contract, msg) = crate::channel_updater::verify_renew_confirm_and_finalize(
1973            &self.secp,
1974            &mut signed_channel,
1975            &accepted_contract,
1976            renew_confirm,
1977            PEER_TIMEOUT,
1978            &self.time,
1979            &self.wallet,
1980            &self.signer_provider,
1981            &self.chain_monitor,
1982        )
1983        .await?;
1984
1985        self.chain_monitor.lock().await.add_tx(
1986            prev_tx_id,
1987            ChannelInfo {
1988                channel_id: signed_channel.channel_id,
1989                tx_type,
1990            },
1991        );
1992
1993        // Directly confirmed as we're in a channel the fund tx is already confirmed.
1994        self.store
1995            .upsert_channel(
1996                Channel::Signed(signed_channel),
1997                Some(Contract::Confirmed(signed_contract)),
1998            )
1999            .await?;
2000
2001        self.store
2002            .persist_chain_monitor(&*self.chain_monitor.lock().await)
2003            .await?;
2004
2005        if let Some(closed_contract) = closed_contract {
2006            self.store.update_contract(&closed_contract).await?;
2007        }
2008
2009        Ok(msg)
2010    }
2011
2012    async fn on_renew_finalize(
2013        &self,
2014        renew_finalize: &RenewFinalize,
2015        peer_id: &PublicKey,
2016    ) -> Result<RenewRevoke, Error> {
2017        let mut signed_channel =
2018            get_channel_in_state!(self, &renew_finalize.channel_id, Signed, Some(*peer_id))?;
2019        let own_payout = get_signed_channel_state!(signed_channel, RenewConfirmed, own_payout)?;
2020
2021        let (tx_type, prev_tx_id, closed_contract) = match signed_channel
2022            .roll_back_state
2023            .as_ref()
2024            .expect("to have a rollback state")
2025        {
2026            SignedChannelState::Established {
2027                own_buffer_adaptor_signature,
2028                buffer_transaction,
2029                signed_contract_id,
2030                ..
2031            } => {
2032                let closed_contract = self
2033                    .get_collaboratively_closed_contract(signed_contract_id, *own_payout, true)
2034                    .await?;
2035                (
2036                    TxType::Revoked {
2037                        update_idx: signed_channel.update_idx,
2038                        own_adaptor_signature: *own_buffer_adaptor_signature,
2039                        is_offer: false,
2040                        revoked_tx_type: RevokedTxType::Buffer,
2041                    },
2042                    buffer_transaction.compute_txid(),
2043                    Some(Contract::Closed(closed_contract)),
2044                )
2045            }
2046            SignedChannelState::Settled {
2047                settle_tx,
2048                own_settle_adaptor_signature,
2049                ..
2050            } => (
2051                TxType::Revoked {
2052                    update_idx: signed_channel.update_idx,
2053                    own_adaptor_signature: *own_settle_adaptor_signature,
2054                    is_offer: false,
2055                    revoked_tx_type: RevokedTxType::Settle,
2056                },
2057                settle_tx.compute_txid(),
2058                None,
2059            ),
2060            s => {
2061                return Err(Error::InvalidState(format!(
2062                    "Expected rollback state of Established or Settled but was {s:?}"
2063                )))
2064            }
2065        };
2066
2067        let msg = crate::channel_updater::renew_channel_on_finalize(
2068            &self.secp,
2069            &mut signed_channel,
2070            renew_finalize,
2071            &self.signer_provider,
2072        )?;
2073
2074        self.chain_monitor.lock().await.add_tx(
2075            prev_tx_id,
2076            ChannelInfo {
2077                channel_id: signed_channel.channel_id,
2078                tx_type,
2079            },
2080        );
2081
2082        let buffer_tx =
2083            get_signed_channel_state!(signed_channel, Established, ref buffer_transaction)?;
2084
2085        self.chain_monitor.lock().await.add_tx(
2086            buffer_tx.compute_txid(),
2087            ChannelInfo {
2088                channel_id: signed_channel.channel_id,
2089                tx_type: TxType::BufferTx,
2090            },
2091        );
2092
2093        self.store
2094            .upsert_channel(Channel::Signed(signed_channel), None)
2095            .await?;
2096        self.store
2097            .persist_chain_monitor(&*self.chain_monitor.lock().await)
2098            .await?;
2099
2100        if let Some(closed_contract) = closed_contract {
2101            self.store.update_contract(&closed_contract).await?;
2102        }
2103
2104        Ok(msg)
2105    }
2106
2107    async fn on_renew_revoke(
2108        &self,
2109        renew_revoke: &RenewRevoke,
2110        peer_id: &PublicKey,
2111    ) -> Result<(), Error> {
2112        let mut signed_channel =
2113            get_channel_in_state!(self, &renew_revoke.channel_id, Signed, Some(*peer_id))?;
2114
2115        crate::channel_updater::renew_channel_on_revoke(
2116            &self.secp,
2117            &mut signed_channel,
2118            renew_revoke,
2119        )?;
2120
2121        self.store
2122            .upsert_channel(Channel::Signed(signed_channel), None)
2123            .await?;
2124
2125        Ok(())
2126    }
2127
2128    async fn on_collaborative_close_offer(
2129        &self,
2130        close_offer: &CollaborativeCloseOffer,
2131        peer_id: &PublicKey,
2132    ) -> Result<(), Error> {
2133        let mut signed_channel =
2134            get_channel_in_state!(self, &close_offer.channel_id, Signed, Some(*peer_id))?;
2135
2136        crate::channel_updater::on_collaborative_close_offer(
2137            &mut signed_channel,
2138            close_offer,
2139            PEER_TIMEOUT,
2140            &self.time,
2141        )?;
2142
2143        self.store
2144            .upsert_channel(Channel::Signed(signed_channel), None)
2145            .await?;
2146
2147        Ok(())
2148    }
2149
2150    async fn on_reject(&self, reject: &Reject, counter_party: &PublicKey) -> Result<(), Error> {
2151        let channel = self.store.get_channel(&reject.channel_id).await?;
2152
2153        if let Some(channel) = channel {
2154            if channel.get_counter_party_id() != *counter_party {
2155                return Err(Error::InvalidParameters(format!(
2156                    "Peer {:02x?} is not involved with {} {:02x?}.",
2157                    counter_party,
2158                    stringify!(Channel),
2159                    channel.get_id()
2160                )));
2161            }
2162            match channel {
2163                Channel::Offered(offered_channel) => {
2164                    let offered_contract = get_contract_in_state!(
2165                        self,
2166                        &offered_channel.offered_contract_id,
2167                        Offered,
2168                        None as Option<PublicKey>
2169                    )?;
2170                    let utxos = offered_contract
2171                        .funding_inputs
2172                        .iter()
2173                        .map(|funding_input| {
2174                            let txid = Transaction::consensus_decode(
2175                                &mut funding_input.prev_tx.as_slice(),
2176                            )
2177                            .expect("Transaction Decode Error")
2178                            .compute_txid();
2179                            let vout = funding_input.prev_tx_vout;
2180                            OutPoint { txid, vout }
2181                        })
2182                        .collect::<Vec<_>>();
2183
2184                    self.wallet.unreserve_utxos(&utxos)?;
2185
2186                    // remove rejected channel, since nothing has been confirmed on chain yet.
2187                    self.store
2188                        .upsert_channel(
2189                            Channel::Cancelled(offered_channel),
2190                            Some(Contract::Rejected(offered_contract)),
2191                        )
2192                        .await?;
2193                }
2194                Channel::Signed(mut signed_channel) => {
2195                    let contract = match signed_channel.state {
2196                        SignedChannelState::RenewOffered {
2197                            offered_contract_id,
2198                            ..
2199                        } => {
2200                            let offered_contract = get_contract_in_state!(
2201                                self,
2202                                &offered_contract_id,
2203                                Offered,
2204                                None::<PublicKey>
2205                            )?;
2206                            Some(Contract::Rejected(offered_contract))
2207                        }
2208                        _ => None,
2209                    };
2210
2211                    crate::channel_updater::on_reject(&mut signed_channel)?;
2212
2213                    self.store
2214                        .upsert_channel(Channel::Signed(signed_channel), contract)
2215                        .await?;
2216                }
2217                channel => {
2218                    return Err(Error::InvalidState(format!(
2219                        "Not in a state adequate to receive a reject message. {:?}",
2220                        channel
2221                    )))
2222                }
2223            }
2224        } else {
2225            tracing::warn!(
2226                "Couldn't find rejected dlc channel with id: {}",
2227                reject.channel_id.to_lower_hex_string()
2228            );
2229        }
2230
2231        Ok(())
2232    }
2233
2234    async fn channel_checks(&self) -> Result<(), Error> {
2235        let established_closing_channels = self
2236            .store
2237            .get_signed_channels(Some(SignedChannelStateType::Closing))
2238            .await?;
2239
2240        for channel in established_closing_channels {
2241            if let Err(e) = self.try_finalize_closing_established_channel(channel).await {
2242                tracing::error!("Error trying to close established channel: {}", e);
2243            }
2244        }
2245
2246        if let Err(e) = self.check_for_timed_out_channels().await {
2247            tracing::error!("Error checking timed out channels {}", e);
2248        }
2249        self.check_for_watched_tx().await
2250    }
2251
2252    async fn check_for_timed_out_channels(&self) -> Result<(), Error> {
2253        check_for_timed_out_channels!(self, RenewOffered);
2254        check_for_timed_out_channels!(self, RenewAccepted);
2255        check_for_timed_out_channels!(self, RenewConfirmed);
2256        check_for_timed_out_channels!(self, SettledOffered);
2257        check_for_timed_out_channels!(self, SettledAccepted);
2258        check_for_timed_out_channels!(self, SettledConfirmed);
2259
2260        Ok(())
2261    }
2262
2263    pub(crate) async fn process_watched_txs(
2264        &self,
2265        watched_txs: Vec<(Transaction, ChannelInfo)>,
2266    ) -> Result<(), Error> {
2267        for (tx, channel_info) in watched_txs {
2268            let mut signed_channel = match get_channel_in_state!(
2269                self,
2270                &channel_info.channel_id,
2271                Signed,
2272                None as Option<PublicKey>
2273            ) {
2274                Ok(c) => c,
2275                Err(e) => {
2276                    tracing::error!(
2277                        "Could not retrieve channel {:?}: {}",
2278                        channel_info.channel_id,
2279                        e
2280                    );
2281                    continue;
2282                }
2283            };
2284
2285            let persist = match channel_info.tx_type {
2286                TxType::BufferTx => {
2287                    // TODO(tibo): should only considered closed after some confirmations.
2288                    // Ideally should save previous state, and maybe restore in
2289                    // case of reorg, though if the counter party has sent the
2290                    // tx to close the channel it is unlikely that the tx will
2291                    // not be part of a future block.
2292
2293                    let contract_id = signed_channel
2294                        .get_contract_id()
2295                        .expect("to have a contract id");
2296                    let mut state = SignedChannelState::Closing {
2297                        buffer_transaction: tx.clone(),
2298                        is_initiator: false,
2299                        contract_id,
2300                        keys_id: signed_channel.keys_id().ok_or_else(|| {
2301                            Error::InvalidState("Expected to have keys_id.".to_string())
2302                        })?,
2303                    };
2304                    std::mem::swap(&mut signed_channel.state, &mut state);
2305
2306                    signed_channel.roll_back_state = Some(state);
2307
2308                    self.store
2309                        .upsert_channel(Channel::Signed(signed_channel), None)
2310                        .await?;
2311
2312                    false
2313                }
2314                TxType::Revoked {
2315                    update_idx,
2316                    own_adaptor_signature,
2317                    is_offer,
2318                    revoked_tx_type,
2319                } => {
2320                    let secret = signed_channel
2321                        .counter_party_commitment_secrets
2322                        .get_secret(update_idx)
2323                        .expect("to be able to retrieve the per update secret");
2324                    let counter_per_update_secret = SecretKey::from_slice(&secret)
2325                        .expect("to be able to parse the counter per update secret.");
2326
2327                    let per_update_seed_pk = signed_channel.own_per_update_seed;
2328
2329                    let per_update_seed_sk = self
2330                        .signer_provider
2331                        .get_secret_key_for_pubkey(&per_update_seed_pk)?;
2332
2333                    let per_update_secret = SecretKey::from_slice(&build_commitment_secret(
2334                        per_update_seed_sk.as_ref(),
2335                        update_idx,
2336                    ))
2337                    .expect("a valid secret key.");
2338
2339                    let per_update_point =
2340                        PublicKey::from_secret_key(&self.secp, &per_update_secret);
2341
2342                    let own_revocation_params = signed_channel.own_points.get_revokable_params(
2343                        &self.secp,
2344                        &signed_channel.counter_points.revocation_basepoint,
2345                        &per_update_point,
2346                    );
2347
2348                    let counter_per_update_point =
2349                        PublicKey::from_secret_key(&self.secp, &counter_per_update_secret);
2350
2351                    let base_own_sk = self
2352                        .signer_provider
2353                        .get_secret_key_for_pubkey(&signed_channel.own_points.own_basepoint)?;
2354
2355                    let own_sk = derive_private_key(&self.secp, &per_update_point, &base_own_sk);
2356
2357                    let counter_revocation_params =
2358                        signed_channel.counter_points.get_revokable_params(
2359                            &self.secp,
2360                            &signed_channel.own_points.revocation_basepoint,
2361                            &counter_per_update_point,
2362                        );
2363
2364                    let witness = if signed_channel.own_params.fund_pubkey
2365                        < signed_channel.counter_params.fund_pubkey
2366                    {
2367                        tx.input[0].witness.to_vec().remove(1)
2368                    } else {
2369                        tx.input[0].witness.to_vec().remove(2)
2370                    };
2371
2372                    let sig_data = witness
2373                        .iter()
2374                        .take(witness.len() - 1)
2375                        .cloned()
2376                        .collect::<Vec<_>>();
2377                    let own_sig = Signature::from_der(&sig_data)?;
2378
2379                    let counter_sk = own_adaptor_signature.recover(
2380                        &self.secp,
2381                        &own_sig,
2382                        &counter_revocation_params.publish_pk.inner,
2383                    )?;
2384
2385                    let own_revocation_base_secret =
2386                        &self.signer_provider.get_secret_key_for_pubkey(
2387                            &signed_channel.own_points.revocation_basepoint,
2388                        )?;
2389
2390                    let counter_revocation_sk = derive_private_revocation_key(
2391                        &self.secp,
2392                        &counter_per_update_secret,
2393                        own_revocation_base_secret,
2394                    );
2395
2396                    let (offer_params, accept_params) = if is_offer {
2397                        (&own_revocation_params, &counter_revocation_params)
2398                    } else {
2399                        (&counter_revocation_params, &own_revocation_params)
2400                    };
2401
2402                    let fee_rate_per_vb: u64 = (self.fee_estimator.get_est_sat_per_1000_weight(
2403                        lightning::chain::chaininterface::ConfirmationTarget::UrgentOnChainSweep,
2404                    ) / 250)
2405                        .into();
2406
2407                    let signed_tx = match revoked_tx_type {
2408                        RevokedTxType::Buffer => {
2409                            dlc::channel::create_and_sign_punish_buffer_transaction(
2410                                &self.secp,
2411                                offer_params,
2412                                accept_params,
2413                                &own_sk,
2414                                &counter_sk,
2415                                &counter_revocation_sk,
2416                                &tx,
2417                                &self.wallet.get_new_address()?,
2418                                0,
2419                                fee_rate_per_vb,
2420                            )?
2421                        }
2422                        RevokedTxType::Settle => {
2423                            dlc::channel::create_and_sign_punish_settle_transaction(
2424                                &self.secp,
2425                                offer_params,
2426                                accept_params,
2427                                &own_sk,
2428                                &counter_sk,
2429                                &counter_revocation_sk,
2430                                &tx,
2431                                &self.wallet.get_new_address()?,
2432                                CET_NSEQUENCE,
2433                                0,
2434                                fee_rate_per_vb,
2435                                is_offer,
2436                            )?
2437                        }
2438                    };
2439
2440                    self.blockchain.send_transaction(&signed_tx).await?;
2441
2442                    let closed_channel = Channel::ClosedPunished(ClosedPunishedChannel {
2443                        counter_party: signed_channel.counter_party,
2444                        temporary_channel_id: signed_channel.temporary_channel_id,
2445                        channel_id: signed_channel.channel_id,
2446                        punish_txid: signed_tx.compute_txid(),
2447                    });
2448
2449                    //TODO(tibo): should probably make sure the tx is confirmed somewhere before
2450                    //stop watching the cheating tx.
2451                    self.chain_monitor
2452                        .lock()
2453                        .await
2454                        .cleanup_channel(signed_channel.channel_id);
2455                    self.store.upsert_channel(closed_channel, None).await?;
2456                    true
2457                }
2458                TxType::CollaborativeClose => {
2459                    if let Some(SignedChannelState::Established {
2460                        signed_contract_id, ..
2461                    }) = signed_channel.roll_back_state
2462                    {
2463                        let counter_payout = get_signed_channel_state!(
2464                            signed_channel,
2465                            CollaborativeCloseOffered,
2466                            counter_payout
2467                        )?;
2468                        let closed_contract = self
2469                            .get_collaboratively_closed_contract(
2470                                &signed_contract_id,
2471                                *counter_payout,
2472                                false,
2473                            )
2474                            .await?;
2475                        self.store
2476                            .update_contract(&Contract::Closed(closed_contract))
2477                            .await?;
2478                    }
2479                    let closed_channel = Channel::CollaborativelyClosed(ClosedChannel {
2480                        counter_party: signed_channel.counter_party,
2481                        temporary_channel_id: signed_channel.temporary_channel_id,
2482                        channel_id: signed_channel.channel_id,
2483                    });
2484                    self.chain_monitor
2485                        .lock()
2486                        .await
2487                        .cleanup_channel(signed_channel.channel_id);
2488                    self.store.upsert_channel(closed_channel, None).await?;
2489                    true
2490                }
2491                TxType::SettleTx => {
2492                    let closed_channel = Channel::CounterClosed(ClosedChannel {
2493                        counter_party: signed_channel.counter_party,
2494                        temporary_channel_id: signed_channel.temporary_channel_id,
2495                        channel_id: signed_channel.channel_id,
2496                    });
2497                    self.chain_monitor
2498                        .lock()
2499                        .await
2500                        .cleanup_channel(signed_channel.channel_id);
2501                    self.store.upsert_channel(closed_channel, None).await?;
2502                    true
2503                }
2504                TxType::Cet => {
2505                    let contract_id = signed_channel.get_contract_id();
2506                    let closed_channel = {
2507                        match &signed_channel.state {
2508                            SignedChannelState::Closing { is_initiator, .. } => {
2509                                if *is_initiator {
2510                                    Channel::Closed(ClosedChannel {
2511                                        counter_party: signed_channel.counter_party,
2512                                        temporary_channel_id: signed_channel.temporary_channel_id,
2513                                        channel_id: signed_channel.channel_id,
2514                                    })
2515                                } else {
2516                                    Channel::CounterClosed(ClosedChannel {
2517                                        counter_party: signed_channel.counter_party,
2518                                        temporary_channel_id: signed_channel.temporary_channel_id,
2519                                        channel_id: signed_channel.channel_id,
2520                                    })
2521                                }
2522                            }
2523                            _ => {
2524                                tracing::error!("Saw spending of buffer transaction without being in closing state");
2525                                Channel::Closed(ClosedChannel {
2526                                    counter_party: signed_channel.counter_party,
2527                                    temporary_channel_id: signed_channel.temporary_channel_id,
2528                                    channel_id: signed_channel.channel_id,
2529                                })
2530                            }
2531                        }
2532                    };
2533
2534                    self.chain_monitor
2535                        .lock()
2536                        .await
2537                        .cleanup_channel(signed_channel.channel_id);
2538                    let pre_closed_contract = futures::stream::iter(contract_id)
2539                        .then(|contract_id| {
2540                            let txn = tx.clone();
2541                            async move {
2542                                self.store.get_contract(&contract_id).await.map(|contract| {
2543                                    contract.and_then(|contract| match contract {
2544                                        Contract::Confirmed(signed_contract) => {
2545                                            Some(Contract::PreClosed(PreClosedContract {
2546                                                signed_contract,
2547                                                attestations: None,
2548                                                signed_cet: txn,
2549                                            }))
2550                                        }
2551                                        _ => None,
2552                                    })
2553                                })
2554                            }
2555                        })
2556                        .try_collect::<Vec<_>>()
2557                        .await?
2558                        .into_iter()
2559                        .flatten()
2560                        .next();
2561
2562                    self.store
2563                        .upsert_channel(closed_channel, pre_closed_contract)
2564                        .await?;
2565
2566                    true
2567                }
2568            };
2569
2570            if persist {
2571                self.store
2572                    .persist_chain_monitor(&*self.chain_monitor.lock().await)
2573                    .await?;
2574            }
2575        }
2576        Ok(())
2577    }
2578
2579    async fn check_for_watched_tx(&self) -> Result<(), Error> {
2580        let confirmed_txs = self.chain_monitor.lock().await.confirmed_txs();
2581
2582        self.process_watched_txs(confirmed_txs).await?;
2583
2584        self.store
2585            .persist_chain_monitor(&*self.chain_monitor.lock().await)
2586            .await?;
2587
2588        Ok(())
2589    }
2590
2591    async fn force_close_channel_internal(
2592        &self,
2593        mut channel: SignedChannel,
2594        is_initiator: bool,
2595    ) -> Result<(), Error> {
2596        match &channel.state {
2597            SignedChannelState::Established {
2598                counter_buffer_adaptor_signature,
2599                buffer_transaction,
2600                ..
2601            } => {
2602                let counter_buffer_adaptor_signature = *counter_buffer_adaptor_signature;
2603                let buffer_transaction = buffer_transaction.clone();
2604                self.initiate_unilateral_close_established_channel(
2605                    channel,
2606                    is_initiator,
2607                    counter_buffer_adaptor_signature,
2608                    buffer_transaction,
2609                )
2610                .await
2611            }
2612            SignedChannelState::RenewFinalized {
2613                buffer_transaction,
2614                offer_buffer_adaptor_signature,
2615                ..
2616            } => {
2617                let offer_buffer_adaptor_signature = *offer_buffer_adaptor_signature;
2618                let buffer_transaction = buffer_transaction.clone();
2619                self.initiate_unilateral_close_established_channel(
2620                    channel,
2621                    is_initiator,
2622                    offer_buffer_adaptor_signature,
2623                    buffer_transaction,
2624                )
2625                .await
2626            }
2627            SignedChannelState::Settled { .. } => {
2628                self.close_settled_channel(channel, is_initiator).await
2629            }
2630            SignedChannelState::SettledOffered { .. }
2631            | SignedChannelState::SettledReceived { .. }
2632            | SignedChannelState::SettledAccepted { .. }
2633            | SignedChannelState::SettledConfirmed { .. }
2634            | SignedChannelState::RenewOffered { .. }
2635            | SignedChannelState::RenewAccepted { .. }
2636            | SignedChannelState::RenewConfirmed { .. }
2637            | SignedChannelState::CollaborativeCloseOffered { .. } => {
2638                channel.state = channel
2639                    .roll_back_state
2640                    .take()
2641                    .expect("to have a rollback state");
2642                let channel_clone = channel.clone(); // Clone the channel to avoid moving it
2643                Box::pin(self.force_close_channel_internal(channel_clone, is_initiator)).await
2644            }
2645            SignedChannelState::Closing { .. } => Err(Error::InvalidState(
2646                "Channel is already closing.".to_string(),
2647            )),
2648        }
2649    }
2650
2651    /// Initiate the unilateral closing of a channel that has been established.
2652    async fn initiate_unilateral_close_established_channel(
2653        &self,
2654        mut signed_channel: SignedChannel,
2655        is_initiator: bool,
2656        buffer_adaptor_signature: EcdsaAdaptorSignature,
2657        buffer_transaction: Transaction,
2658    ) -> Result<(), Error> {
2659        let keys_id = signed_channel.keys_id().ok_or_else(|| {
2660            Error::InvalidState("Expected to be in a state with an associated keys id.".to_string())
2661        })?;
2662
2663        crate::channel_updater::initiate_unilateral_close_established_channel(
2664            &self.secp,
2665            &mut signed_channel,
2666            buffer_adaptor_signature,
2667            keys_id,
2668            buffer_transaction,
2669            &self.signer_provider,
2670            is_initiator,
2671        )?;
2672
2673        let buffer_transaction =
2674            get_signed_channel_state!(signed_channel, Closing, ref buffer_transaction)?;
2675
2676        self.blockchain.send_transaction(buffer_transaction).await?;
2677
2678        self.chain_monitor
2679            .lock()
2680            .await
2681            .remove_tx(&buffer_transaction.compute_txid());
2682
2683        self.store
2684            .upsert_channel(Channel::Signed(signed_channel), None)
2685            .await?;
2686
2687        self.store
2688            .persist_chain_monitor(&*self.chain_monitor.lock().await)
2689            .await?;
2690
2691        Ok(())
2692    }
2693
2694    /// Unilaterally close a channel that has been settled.
2695    async fn close_settled_channel(
2696        &self,
2697        signed_channel: SignedChannel,
2698        is_initiator: bool,
2699    ) -> Result<(), Error> {
2700        let (settle_tx, closed_channel) = crate::channel_updater::close_settled_channel(
2701            &self.secp,
2702            &signed_channel,
2703            &self.signer_provider,
2704            is_initiator,
2705        )?;
2706
2707        if self
2708            .blockchain
2709            .get_transaction_confirmations(&settle_tx.compute_txid())
2710            .await
2711            .unwrap_or(0)
2712            == 0
2713        {
2714            self.blockchain.send_transaction(&settle_tx).await?;
2715        }
2716
2717        self.chain_monitor
2718            .lock()
2719            .await
2720            .cleanup_channel(signed_channel.channel_id);
2721
2722        self.store.upsert_channel(closed_channel, None).await?;
2723
2724        Ok(())
2725    }
2726
2727    async fn get_collaboratively_closed_contract(
2728        &self,
2729        contract_id: &ContractId,
2730        payout: u64,
2731        is_own_payout: bool,
2732    ) -> Result<ClosedContract, Error> {
2733        let contract = get_contract_in_state!(self, contract_id, Confirmed, None::<PublicKey>)?;
2734        let own_collateral = if contract.accepted_contract.offered_contract.is_offer_party {
2735            contract
2736                .accepted_contract
2737                .offered_contract
2738                .offer_params
2739                .collateral
2740        } else {
2741            contract.accepted_contract.accept_params.collateral
2742        };
2743        let own_payout = if is_own_payout {
2744            payout
2745        } else {
2746            contract.accepted_contract.offered_contract.total_collateral - payout
2747        };
2748        let pnl = own_payout as i64 - own_collateral as i64;
2749        Ok(ClosedContract {
2750            attestations: None,
2751            signed_cet: None,
2752            contract_id: *contract_id,
2753            temporary_contract_id: contract.accepted_contract.offered_contract.id,
2754            counter_party_id: contract.accepted_contract.offered_contract.counter_party,
2755            pnl,
2756        })
2757    }
2758
2759    async fn oracle_announcements(
2760        &self,
2761        contract_input: &ContractInput,
2762    ) -> Result<Vec<Vec<OracleAnnouncement>>, Error> {
2763        let announcements = stream::iter(contract_input.contract_infos.iter())
2764            .map(|x| {
2765                let future = self.get_oracle_announcements(&x.oracles);
2766                async move {
2767                    match future.await {
2768                        Ok(result) => Ok(result),
2769                        Err(e) => {
2770                            tracing::error!("Failed to get oracle announcements: {}", e);
2771                            Err(e)
2772                        }
2773                    }
2774                }
2775            })
2776            .collect::<FuturesUnordered<_>>()
2777            .await
2778            .try_collect::<Vec<_>>()
2779            .await?;
2780        Ok(announcements)
2781    }
2782}