ddk_manager/
manager.rs

1//! #Manager a component to create and update DLCs.
2
3use super::{
4    Blockchain, CachedContractSignerProvider, ContractSigner, Oracle, Storage, Time, Wallet,
5};
6use crate::chain_monitor::{ChainMonitor, ChannelInfo, RevokedTxType, TxType};
7use crate::channel::offered_channel::OfferedChannel;
8use crate::channel::signed_channel::{SignedChannel, SignedChannelState, SignedChannelStateType};
9use crate::channel::{Channel, ClosedChannel, ClosedPunishedChannel};
10use crate::channel_updater::get_signed_channel_state;
11use crate::channel_updater::verify_signed_channel;
12use crate::contract::{
13    accepted_contract::AcceptedContract, contract_info::ContractInfo,
14    contract_input::ContractInput, contract_input::OracleInput, offered_contract::OfferedContract,
15    signed_contract::SignedContract, AdaptorInfo, ClosedContract, Contract, FailedAcceptContract,
16    FailedSignContract, PreClosedContract,
17};
18use crate::contract_updater::{accept_contract, verify_accepted_and_sign_contract};
19use crate::error::Error;
20use crate::utils::get_object_in_state;
21use crate::{ChannelId, ContractId, ContractSignerProvider};
22use bitcoin::absolute::Height;
23use bitcoin::consensus::encode::serialize_hex;
24use bitcoin::consensus::Decodable;
25use bitcoin::Address;
26use bitcoin::{OutPoint, Transaction};
27use dlc_messages::channel::{
28    AcceptChannel, CollaborativeCloseOffer, OfferChannel, Reject, RenewAccept, RenewConfirm,
29    RenewFinalize, RenewOffer, RenewRevoke, SettleAccept, SettleConfirm, SettleFinalize,
30    SettleOffer, SignChannel,
31};
32use dlc_messages::oracle_msgs::{OracleAnnouncement, OracleAttestation};
33use dlc_messages::{AcceptDlc, Message as DlcMessage, OfferDlc, SignDlc};
34use futures::stream;
35use futures::stream::FuturesUnordered;
36use futures::{StreamExt, TryStreamExt};
37use hex::DisplayHex;
38use lightning::chain::chaininterface::FeeEstimator;
39use lightning::ln::chan_utils::{
40    build_commitment_secret, derive_private_key, derive_private_revocation_key,
41};
42use secp256k1_zkp::XOnlyPublicKey;
43use secp256k1_zkp::{
44    ecdsa::Signature, All, EcdsaAdaptorSignature, PublicKey, Secp256k1, SecretKey,
45};
46use std::collections::HashMap;
47use std::ops::Deref;
48use std::string::ToString;
49use std::sync::{Arc, Mutex};
50
51/// The number of confirmations required before moving the the confirmed state.
52pub const NB_CONFIRMATIONS: u32 = 6;
53/// The delay to set the refund value to.
54pub const REFUND_DELAY: u32 = 86400 * 7;
55/// The nSequence value used for CETs in DLC channels
56pub const CET_NSEQUENCE: u32 = 288;
57/// Timeout in seconds when waiting for a peer's reply, after which a DLC channel
58/// is forced closed.
59pub const PEER_TIMEOUT: u64 = 3600;
60
61type ClosableContractInfo<'a> = Option<(
62    &'a ContractInfo,
63    &'a AdaptorInfo,
64    Vec<(usize, OracleAttestation)>,
65)>;
66
67/// Used to create and update DLCs.
68pub struct Manager<
69    W: Deref,
70    SP: Deref,
71    B: Deref,
72    S: Deref,
73    O: Deref,
74    T: Deref,
75    F: Deref,
76    X: ContractSigner,
77> where
78    W::Target: Wallet,
79    SP::Target: ContractSignerProvider<Signer = X>,
80    B::Target: Blockchain,
81    S::Target: Storage,
82    O::Target: Oracle,
83    T::Target: Time,
84    F::Target: FeeEstimator,
85{
86    oracles: HashMap<XOnlyPublicKey, O>,
87    wallet: W,
88    signer_provider: SP,
89    blockchain: B,
90    store: S,
91    secp: Secp256k1<All>,
92    chain_monitor: Mutex<ChainMonitor>,
93    time: T,
94    fee_estimator: F,
95}
96
97macro_rules! get_contract_in_state {
98    ($manager: ident, $contract_id: expr, $state: ident, $peer_id: expr) => {{
99        get_object_in_state!(
100            $manager,
101            $contract_id,
102            $state,
103            $peer_id,
104            Contract,
105            get_contract
106        )
107    }};
108}
109
110macro_rules! get_channel_in_state {
111    ($manager: ident, $channel_id: expr, $state: ident, $peer_id: expr) => {{
112        get_object_in_state!(
113            $manager,
114            $channel_id,
115            $state,
116            $peer_id,
117            Channel,
118            get_channel
119        )
120    }};
121}
122
123macro_rules! get_signed_channel_rollback_state {
124    ($signed_channel: ident, $state: ident, $($field: ident),*) => {{
125       match $signed_channel.roll_back_state.as_ref() {
126           Some(SignedChannelState::$state{$($field,)* ..}) => Ok(($($field,)*)),
127           _ => Err(Error::InvalidState(format!("Expected rollback state {} got {:?}", stringify!($state), $signed_channel.state))),
128        }
129    }};
130}
131
132macro_rules! check_for_timed_out_channels {
133    ($manager: ident, $state: ident) => {
134        let channels = $manager
135            .store
136            .get_signed_channels(Some(SignedChannelStateType::$state))?;
137
138        for channel in channels {
139            if let SignedChannelState::$state { timeout, .. } = channel.state {
140                let is_timed_out = timeout < $manager.time.unix_time_now();
141                if is_timed_out {
142                    match $manager.force_close_channel_internal(channel, true).await {
143                        Err(e) => tracing::error!("Error force closing channel {}", e),
144                        _ => {}
145                    }
146                }
147            }
148        }
149    };
150}
151
152impl<W: Deref, SP: Deref, B: Deref, S: Deref, O: Deref, T: Deref, F: Deref, X: ContractSigner>
153    Manager<W, Arc<CachedContractSignerProvider<SP, X>>, B, S, O, T, F, X>
154where
155    W::Target: Wallet,
156    SP::Target: ContractSignerProvider<Signer = X>,
157    B::Target: Blockchain,
158    S::Target: Storage,
159    O::Target: Oracle,
160    T::Target: Time,
161    F::Target: FeeEstimator,
162{
163    /// Create a new Manager struct.
164    pub async fn new(
165        wallet: W,
166        signer_provider: SP,
167        blockchain: B,
168        store: S,
169        oracles: HashMap<XOnlyPublicKey, O>,
170        time: T,
171        fee_estimator: F,
172    ) -> Result<Self, Error> {
173        let init_height = blockchain.get_blockchain_height().await?;
174        let chain_monitor = Mutex::new(
175            store
176                .get_chain_monitor()?
177                .unwrap_or(ChainMonitor::new(init_height)),
178        );
179
180        let signer_provider = Arc::new(CachedContractSignerProvider::new(signer_provider));
181
182        Ok(Manager {
183            secp: secp256k1_zkp::Secp256k1::new(),
184            wallet,
185            signer_provider,
186            blockchain,
187            store,
188            oracles,
189            time,
190            fee_estimator,
191            chain_monitor,
192        })
193    }
194
195    /// Get the store from the Manager to access contracts.
196    pub fn get_store(&self) -> &S {
197        &self.store
198    }
199
200    /// Function called to pass a DlcMessage to the Manager.
201    pub async fn on_dlc_message(
202        &self,
203        msg: &DlcMessage,
204        counter_party: PublicKey,
205    ) -> Result<Option<DlcMessage>, Error> {
206        match msg {
207            DlcMessage::Offer(o) => {
208                self.on_offer_message(o, counter_party)?;
209                Ok(None)
210            }
211            DlcMessage::Accept(a) => Ok(Some(self.on_accept_message(a, &counter_party)?)),
212            DlcMessage::Sign(s) => {
213                self.on_sign_message(s, &counter_party).await?;
214                Ok(None)
215            }
216            DlcMessage::OfferChannel(o) => {
217                self.on_offer_channel(o, counter_party)?;
218                Ok(None)
219            }
220            DlcMessage::AcceptChannel(a) => Ok(Some(DlcMessage::SignChannel(
221                self.on_accept_channel(a, &counter_party)?,
222            ))),
223            DlcMessage::SignChannel(s) => {
224                self.on_sign_channel(s, &counter_party).await?;
225                Ok(None)
226            }
227            DlcMessage::SettleOffer(s) => match self.on_settle_offer(s, &counter_party)? {
228                Some(msg) => Ok(Some(DlcMessage::Reject(msg))),
229                None => Ok(None),
230            },
231            DlcMessage::SettleAccept(s) => Ok(Some(DlcMessage::SettleConfirm(
232                self.on_settle_accept(s, &counter_party)?,
233            ))),
234            DlcMessage::SettleConfirm(s) => Ok(Some(DlcMessage::SettleFinalize(
235                self.on_settle_confirm(s, &counter_party)?,
236            ))),
237            DlcMessage::SettleFinalize(s) => {
238                self.on_settle_finalize(s, &counter_party)?;
239                Ok(None)
240            }
241            DlcMessage::RenewOffer(r) => match self.on_renew_offer(r, &counter_party)? {
242                Some(msg) => Ok(Some(DlcMessage::Reject(msg))),
243                None => Ok(None),
244            },
245            DlcMessage::RenewAccept(r) => Ok(Some(DlcMessage::RenewConfirm(
246                self.on_renew_accept(r, &counter_party)?,
247            ))),
248            DlcMessage::RenewConfirm(r) => Ok(Some(DlcMessage::RenewFinalize(
249                self.on_renew_confirm(r, &counter_party)?,
250            ))),
251            DlcMessage::RenewFinalize(r) => {
252                let revoke = self.on_renew_finalize(r, &counter_party)?;
253                Ok(Some(DlcMessage::RenewRevoke(revoke)))
254            }
255            DlcMessage::RenewRevoke(r) => {
256                self.on_renew_revoke(r, &counter_party)?;
257                Ok(None)
258            }
259            DlcMessage::CollaborativeCloseOffer(c) => {
260                self.on_collaborative_close_offer(c, &counter_party)?;
261                Ok(None)
262            }
263            DlcMessage::Reject(r) => {
264                self.on_reject(r, &counter_party)?;
265                Ok(None)
266            }
267        }
268    }
269
270    /// Function called to create a new DLC. The offered contract will be stored
271    /// and an OfferDlc message returned.
272    ///
273    /// This function will fetch the oracle announcements from the oracle.
274    pub async fn send_offer(
275        &self,
276        contract_input: &ContractInput,
277        counter_party: PublicKey,
278    ) -> Result<OfferDlc, Error> {
279        // If the oracle announcement fails to retrieve, then log and continue.
280        let oracle_announcements = self.oracle_announcements(contract_input).await?;
281
282        self.send_offer_with_announcements(contract_input, counter_party, oracle_announcements)
283            .await
284    }
285
286    /// Function called to create a new DLC. The offered contract will be stored
287    /// and an OfferDlc message returned.
288    ///
289    /// This function allows to pass the oracle announcements directly instead of
290    /// fetching them from the oracle.
291    pub async fn send_offer_with_announcements(
292        &self,
293        contract_input: &ContractInput,
294        counter_party: PublicKey,
295        oracle_announcements: Vec<Vec<OracleAnnouncement>>,
296    ) -> Result<OfferDlc, Error> {
297        let (offered_contract, offer_msg) = crate::contract_updater::offer_contract(
298            &self.secp,
299            contract_input,
300            oracle_announcements,
301            REFUND_DELAY,
302            &counter_party,
303            &self.wallet,
304            &self.blockchain,
305            &self.time,
306            &self.signer_provider,
307        )
308        .await?;
309
310        offered_contract.validate()?;
311
312        self.store.create_contract(&offered_contract)?;
313
314        Ok(offer_msg)
315    }
316
317    /// Function to call to accept a DLC for which an offer was received.
318    pub async fn accept_contract_offer(
319        &self,
320        contract_id: &ContractId,
321    ) -> Result<(ContractId, PublicKey, AcceptDlc), Error> {
322        let offered_contract =
323            get_contract_in_state!(self, contract_id, Offered, None as Option<PublicKey>)?;
324
325        let counter_party = offered_contract.counter_party;
326
327        let (accepted_contract, accept_msg) = accept_contract(
328            &self.secp,
329            &offered_contract,
330            &self.wallet,
331            &self.signer_provider,
332            &self.blockchain,
333        )
334        .await?;
335
336        self.wallet.import_address(&Address::p2wsh(
337            &accepted_contract.dlc_transactions.funding_script_pubkey,
338            self.blockchain.get_network()?,
339        ))?;
340
341        let contract_id = accepted_contract.get_contract_id();
342
343        self.store
344            .update_contract(&Contract::Accepted(accepted_contract))?;
345
346        Ok((contract_id, counter_party, accept_msg))
347    }
348
349    /// Function to update the state of the [`ChainMonitor`] with new
350    /// blocks.
351    ///
352    /// Consumers **MUST** call this periodically in order to
353    /// determine when pending transactions reach confirmation.
354    pub async fn periodic_chain_monitor(&self) -> Result<(), Error> {
355        let cur_height = self.blockchain.get_blockchain_height().await?;
356        let last_height = self.chain_monitor.lock().unwrap().last_height;
357
358        // TODO(luckysori): We could end up reprocessing a block at
359        // the same height if there is a reorg.
360        if cur_height < last_height {
361            return Err(Error::InvalidState(
362                "Current height is lower than last height.".to_string(),
363            ));
364        }
365
366        for height in last_height + 1..=cur_height {
367            let block = self.blockchain.get_block_at_height(height).await?;
368
369            self.chain_monitor
370                .lock()
371                .unwrap()
372                .process_block(&block, height);
373        }
374
375        Ok(())
376    }
377
378    /// Function to call to check the state of the currently executing DLCs and
379    /// update them if possible.
380    pub async fn periodic_check(&self, check_channels: bool) -> Result<(), Error> {
381        tracing::debug!("Periodic check.");
382        self.check_signed_contracts().await?;
383        self.check_confirmed_contracts().await?;
384        self.check_preclosed_contracts().await?;
385
386        if check_channels {
387            self.channel_checks().await?;
388        }
389
390        Ok(())
391    }
392
393    fn on_offer_message(
394        &self,
395        offered_message: &OfferDlc,
396        counter_party: PublicKey,
397    ) -> Result<(), Error> {
398        offered_message.validate(&self.secp, REFUND_DELAY, REFUND_DELAY * 2)?;
399        let keys_id = self
400            .signer_provider
401            .derive_signer_key_id(false, offered_message.temporary_contract_id);
402        let contract: OfferedContract =
403            OfferedContract::try_from_offer_dlc(offered_message, counter_party, keys_id)?;
404        contract.validate()?;
405
406        if self.store.get_contract(&contract.id)?.is_some() {
407            return Err(Error::InvalidParameters(
408                "Contract with identical id already exists".to_string(),
409            ));
410        }
411
412        self.store.create_contract(&contract)?;
413
414        Ok(())
415    }
416
417    fn on_accept_message(
418        &self,
419        accept_msg: &AcceptDlc,
420        counter_party: &PublicKey,
421    ) -> Result<DlcMessage, Error> {
422        let offered_contract = get_contract_in_state!(
423            self,
424            &accept_msg.temporary_contract_id,
425            Offered,
426            Some(*counter_party)
427        )?;
428
429        let (signed_contract, signed_msg) = match verify_accepted_and_sign_contract(
430            &self.secp,
431            &offered_contract,
432            accept_msg,
433            &self.wallet,
434            &self.signer_provider,
435        ) {
436            Ok(contract) => contract,
437            Err(e) => return self.accept_fail_on_error(offered_contract, accept_msg.clone(), e),
438        };
439
440        self.wallet.import_address(&Address::p2wsh(
441            &signed_contract
442                .accepted_contract
443                .dlc_transactions
444                .funding_script_pubkey,
445            self.blockchain.get_network()?,
446        ))?;
447
448        self.store
449            .update_contract(&Contract::Signed(signed_contract))?;
450
451        Ok(DlcMessage::Sign(signed_msg))
452    }
453
454    async fn on_sign_message(
455        &self,
456        sign_message: &SignDlc,
457        peer_id: &PublicKey,
458    ) -> Result<(), Error> {
459        let accepted_contract =
460            get_contract_in_state!(self, &sign_message.contract_id, Accepted, Some(*peer_id))?;
461
462        let (signed_contract, fund_tx) = match crate::contract_updater::verify_signed_contract(
463            &self.secp,
464            &accepted_contract,
465            sign_message,
466            &self.wallet,
467        ) {
468            Ok(contract) => contract,
469            Err(e) => return self.sign_fail_on_error(accepted_contract, sign_message.clone(), e),
470        };
471
472        self.store
473            .update_contract(&Contract::Signed(signed_contract))?;
474
475        self.blockchain.send_transaction(&fund_tx).await?;
476
477        Ok(())
478    }
479
480    async fn get_oracle_announcements(
481        &self,
482        oracle_inputs: &OracleInput,
483    ) -> Result<Vec<OracleAnnouncement>, Error> {
484        let mut announcements = Vec::new();
485        for pubkey in &oracle_inputs.public_keys {
486            let oracle = self
487                .oracles
488                .get(pubkey)
489                .ok_or_else(|| Error::InvalidParameters("Unknown oracle public key".to_string()))?;
490            let announcement = oracle.get_announcement(&oracle_inputs.event_id).await?;
491            announcements.push(announcement);
492        }
493
494        Ok(announcements)
495    }
496
497    fn sign_fail_on_error<R>(
498        &self,
499        accepted_contract: AcceptedContract,
500        sign_message: SignDlc,
501        e: Error,
502    ) -> Result<R, Error> {
503        tracing::error!("Error in on_sign {}", e);
504        self.store
505            .update_contract(&Contract::FailedSign(FailedSignContract {
506                accepted_contract,
507                sign_message,
508                error_message: e.to_string(),
509            }))?;
510        Err(e)
511    }
512
513    fn accept_fail_on_error<R>(
514        &self,
515        offered_contract: OfferedContract,
516        accept_message: AcceptDlc,
517        e: Error,
518    ) -> Result<R, Error> {
519        tracing::error!("Error in on_accept {}", e);
520        self.store
521            .update_contract(&Contract::FailedAccept(FailedAcceptContract {
522                offered_contract,
523                accept_message,
524                error_message: e.to_string(),
525            }))?;
526        Err(e)
527    }
528
529    async fn check_signed_contract(&self, contract: &SignedContract) -> Result<(), Error> {
530        let confirmations = self
531            .blockchain
532            .get_transaction_confirmations(
533                &contract
534                    .accepted_contract
535                    .dlc_transactions
536                    .fund
537                    .compute_txid(),
538            )
539            .await?;
540        if confirmations >= NB_CONFIRMATIONS {
541            tracing::info!(
542                confirmations,
543                contract_id = contract.accepted_contract.get_contract_id_string(),
544                "Marking contract as confirmed."
545            );
546            self.store
547                .update_contract(&Contract::Confirmed(contract.clone()))?;
548        } else {
549            tracing::info!(
550                confirmations,
551                required = NB_CONFIRMATIONS,
552                contract_id = contract.accepted_contract.get_contract_id_string(),
553                "Not enough confirmations to mark contract as confirmed."
554            );
555        }
556        Ok(())
557    }
558
559    async fn check_signed_contracts(&self) -> Result<(), Error> {
560        for c in self.store.get_signed_contracts()? {
561            if let Err(e) = self.check_signed_contract(&c).await {
562                tracing::error!(
563                    "Error checking confirmed contract {}: {}",
564                    c.accepted_contract.get_contract_id_string(),
565                    e
566                )
567            }
568        }
569
570        Ok(())
571    }
572
573    async fn check_confirmed_contracts(&self) -> Result<(), Error> {
574        for c in self.store.get_confirmed_contracts()? {
575            // Confirmed contracts from channel are processed in channel specific methods.
576            if c.channel_id.is_some() {
577                continue;
578            }
579            if let Err(e) = self.check_confirmed_contract(&c).await {
580                tracing::error!(
581                    "Error checking confirmed contract {}: {}",
582                    c.accepted_contract.get_contract_id_string(),
583                    e
584                )
585            }
586        }
587
588        Ok(())
589    }
590
591    async fn get_closable_contract_info<'a>(
592        &'a self,
593        contract: &'a SignedContract,
594    ) -> ClosableContractInfo<'a> {
595        let contract_infos = &contract.accepted_contract.offered_contract.contract_info;
596        let adaptor_infos = &contract.accepted_contract.adaptor_infos;
597        for (contract_info, adaptor_info) in contract_infos.iter().zip(adaptor_infos.iter()) {
598            let matured: Vec<_> = contract_info
599                .oracle_announcements
600                .iter()
601                .filter(|x| {
602                    (x.oracle_event.event_maturity_epoch as u64) <= self.time.unix_time_now()
603                })
604                .enumerate()
605                .collect();
606            if matured.len() >= contract_info.threshold {
607                let attestations = stream::iter(matured.iter())
608                    .map(|(i, announcement)| async move {
609                        // First try to get the oracle
610                        let oracle = match self.oracles.get(&announcement.oracle_public_key) {
611                            Some(oracle) => oracle,
612                            None => {
613                                tracing::debug!(
614                                    "Oracle not found for key: {}",
615                                    announcement.oracle_public_key
616                                );
617                                return None;
618                            }
619                        };
620
621                        // Then try to get the attestation
622                        let attestation = match oracle
623                            .get_attestation(&announcement.oracle_event.event_id)
624                            .await
625                        {
626                            Ok(attestation) => attestation,
627                            Err(e) => {
628                                tracing::error!(
629                                    "Attestation not found for event. id={} error={}",
630                                    announcement.oracle_event.event_id,
631                                    e.to_string()
632                                );
633                                return None;
634                            }
635                        };
636
637                        // Validate the attestation
638                        if let Err(e) = attestation.validate(&self.secp, announcement) {
639                            tracing::error!(
640                                "Oracle attestation is not valid. pubkey={} event_id={}, error={:?}",
641                                announcement.oracle_public_key,
642                                announcement.oracle_event.event_id,
643                                e
644                            );
645                            return None;
646                        }
647
648                        Some((*i, attestation))
649                    })
650                    .collect::<FuturesUnordered<_>>()
651                    .await
652                    .filter_map(|result| async move { result }) // Filter out None values
653                    .collect::<Vec<_>>()
654                    .await;
655                if attestations.len() >= contract_info.threshold {
656                    return Some((contract_info, adaptor_info, attestations));
657                }
658            }
659        }
660        None
661    }
662
663    async fn check_confirmed_contract(&self, contract: &SignedContract) -> Result<(), Error> {
664        let closable_contract_info = self.get_closable_contract_info(contract).await;
665        if let Some((contract_info, adaptor_info, attestations)) = closable_contract_info {
666            let offer = &contract.accepted_contract.offered_contract;
667            let signer = self.signer_provider.derive_contract_signer(offer.keys_id)?;
668
669            //  === WARNING ===
670            // This code could potentially be problematic. When running refund tests, it would look for a CET
671            // but the CET would be invalid and refund would not pass. By only updating with a valid CET,
672            // we then go to update. This way if it fails we can check for refund instead of bailing and getting locked
673            // funds.
674            if let Ok(cet) = crate::contract_updater::get_signed_cet(
675                &self.secp,
676                contract,
677                contract_info,
678                adaptor_info,
679                &attestations,
680                &signer,
681            ) {
682                match self
683                    .close_contract(
684                        contract,
685                        cet,
686                        attestations.iter().map(|x| x.1.clone()).collect(),
687                    )
688                    .await
689                {
690                    Ok(closed_contract) => {
691                        self.store.update_contract(&closed_contract)?;
692                        return Ok(());
693                    }
694                    Err(e) => {
695                        tracing::warn!(
696                            "Failed to close contract {}: {}",
697                            contract.accepted_contract.get_contract_id_string(),
698                            e
699                        );
700                        return Err(e);
701                    }
702                }
703            }
704        }
705
706        self.check_refund(contract).await?;
707
708        Ok(())
709    }
710
711    /// Manually close a contract with the oracle attestations.
712    pub async fn close_confirmed_contract(
713        &self,
714        contract_id: &ContractId,
715        attestations: Vec<(usize, OracleAttestation)>,
716    ) -> Result<Contract, Error> {
717        let contract = get_contract_in_state!(self, contract_id, Confirmed, None::<PublicKey>)?;
718        let contract_infos = &contract.accepted_contract.offered_contract.contract_info;
719        let adaptor_infos = &contract.accepted_contract.adaptor_infos;
720
721        // find the contract info that matches the attestations
722        if let Some((contract_info, adaptor_info)) =
723            contract_infos.iter().zip(adaptor_infos).find(|(c, _)| {
724                let matches = attestations
725                    .iter()
726                    .filter(|(i, a)| {
727                        c.oracle_announcements[*i].oracle_event.oracle_nonces == a.nonces()
728                    })
729                    .count();
730
731                matches >= c.threshold
732            })
733        {
734            let offer = &contract.accepted_contract.offered_contract;
735            let signer = self.signer_provider.derive_contract_signer(offer.keys_id)?;
736            let cet = crate::contract_updater::get_signed_cet(
737                &self.secp,
738                &contract,
739                contract_info,
740                adaptor_info,
741                &attestations,
742                &signer,
743            )?;
744
745            // Check that the lock time has passed
746            let time = bitcoin::absolute::Time::from_consensus(self.time.unix_time_now() as u32)
747                .expect("Time is not in valid range. This should never happen.");
748            let height =
749                Height::from_consensus(self.blockchain.get_blockchain_height().await? as u32)
750                    .expect("Height is not in valid range. This should never happen.");
751            let locktime = cet.lock_time;
752
753            if !locktime.is_satisfied_by(height, time) {
754                return Err(Error::InvalidState(
755                    "CET lock time has not passed yet".to_string(),
756                ));
757            }
758
759            match self
760                .close_contract(
761                    &contract,
762                    cet,
763                    attestations.into_iter().map(|x| x.1).collect(),
764                )
765                .await
766            {
767                Ok(closed_contract) => {
768                    self.store.update_contract(&closed_contract)?;
769                    Ok(closed_contract)
770                }
771                Err(e) => {
772                    tracing::warn!(
773                        "Failed to close contract {}: {e}",
774                        contract.accepted_contract.get_contract_id_string()
775                    );
776                    Err(e)
777                }
778            }
779        } else {
780            Err(Error::InvalidState(
781                "Attestations did not match contract infos".to_string(),
782            ))
783        }
784    }
785
786    async fn check_preclosed_contracts(&self) -> Result<(), Error> {
787        for c in self.store.get_preclosed_contracts()? {
788            if let Err(e) = self.check_preclosed_contract(&c).await {
789                tracing::error!(
790                    "Error checking pre-closed contract {}: {}",
791                    c.signed_contract.accepted_contract.get_contract_id_string(),
792                    e
793                )
794            }
795        }
796
797        Ok(())
798    }
799
800    async fn check_preclosed_contract(&self, contract: &PreClosedContract) -> Result<(), Error> {
801        let broadcasted_txid = contract.signed_cet.compute_txid();
802        let confirmations = self
803            .blockchain
804            .get_transaction_confirmations(&broadcasted_txid)
805            .await?;
806        if confirmations >= NB_CONFIRMATIONS {
807            let closed_contract = ClosedContract {
808                attestations: contract.attestations.clone(),
809                signed_cet: Some(contract.signed_cet.clone()),
810                contract_id: contract.signed_contract.accepted_contract.get_contract_id(),
811                temporary_contract_id: contract
812                    .signed_contract
813                    .accepted_contract
814                    .offered_contract
815                    .id,
816                counter_party_id: contract
817                    .signed_contract
818                    .accepted_contract
819                    .offered_contract
820                    .counter_party,
821                pnl: contract
822                    .signed_contract
823                    .accepted_contract
824                    .compute_pnl(&contract.signed_cet),
825            };
826            self.store
827                .update_contract(&Contract::Closed(closed_contract))?;
828        }
829
830        Ok(())
831    }
832
833    async fn close_contract(
834        &self,
835        contract: &SignedContract,
836        signed_cet: Transaction,
837        attestations: Vec<OracleAttestation>,
838    ) -> Result<Contract, Error> {
839        let confirmations = self
840            .blockchain
841            .get_transaction_confirmations(&signed_cet.compute_txid())
842            .await?;
843
844        if confirmations < 1 {
845            tracing::info!(
846                txid = signed_cet.compute_txid().to_string(),
847                "Broadcasting signed CET."
848            );
849            // TODO(tibo): if this fails because another tx is already in
850            // mempool or blockchain, we might have been cheated. There is
851            // not much to be done apart from possibly extracting a fraud
852            // proof but ideally it should be handled.
853            self.blockchain.send_transaction(&signed_cet).await?;
854
855            let preclosed_contract = PreClosedContract {
856                signed_contract: contract.clone(),
857                attestations: Some(attestations),
858                signed_cet,
859            };
860
861            return Ok(Contract::PreClosed(preclosed_contract));
862        } else if confirmations < NB_CONFIRMATIONS {
863            let preclosed_contract = PreClosedContract {
864                signed_contract: contract.clone(),
865                attestations: Some(attestations),
866                signed_cet,
867            };
868
869            return Ok(Contract::PreClosed(preclosed_contract));
870        }
871
872        let closed_contract = ClosedContract {
873            attestations: Some(attestations.to_vec()),
874            pnl: contract.accepted_contract.compute_pnl(&signed_cet),
875            signed_cet: Some(signed_cet),
876            contract_id: contract.accepted_contract.get_contract_id(),
877            temporary_contract_id: contract.accepted_contract.offered_contract.id,
878            counter_party_id: contract.accepted_contract.offered_contract.counter_party,
879        };
880
881        Ok(Contract::Closed(closed_contract))
882    }
883
884    async fn check_refund(&self, contract: &SignedContract) -> Result<(), Error> {
885        // TODO(tibo): should check for confirmation of refund before updating state
886        if contract
887            .accepted_contract
888            .dlc_transactions
889            .refund
890            .lock_time
891            .to_consensus_u32() as u64
892            <= self.time.unix_time_now()
893        {
894            let accepted_contract = &contract.accepted_contract;
895            let refund = accepted_contract.dlc_transactions.refund.clone();
896            let confirmations = self
897                .blockchain
898                .get_transaction_confirmations(&refund.compute_txid())
899                .await?;
900            if confirmations == 0 {
901                let offer = &contract.accepted_contract.offered_contract;
902                let signer = self.signer_provider.derive_contract_signer(offer.keys_id)?;
903                let refund =
904                    crate::contract_updater::get_signed_refund(&self.secp, contract, &signer)?;
905                self.blockchain.send_transaction(&refund).await?;
906            }
907
908            self.store
909                .update_contract(&Contract::Refunded(contract.clone()))?;
910        }
911
912        Ok(())
913    }
914
915    /// Function to call when we detect that a contract was closed by our counter party.
916    /// This will update the state of the contract and return the [`Contract`] object.
917    pub fn on_counterparty_close(
918        &mut self,
919        contract: &SignedContract,
920        closing_tx: Transaction,
921        confirmations: u32,
922    ) -> Result<Contract, Error> {
923        // check if the closing tx actually spends the funding output
924        if !closing_tx.input.iter().any(|i| {
925            i.previous_output
926                == contract
927                    .accepted_contract
928                    .dlc_transactions
929                    .get_fund_outpoint()
930        }) {
931            return Err(Error::InvalidParameters(
932                "Closing tx does not spend the funding tx".to_string(),
933            ));
934        }
935
936        // check if it is the refund tx (easy case)
937        if contract
938            .accepted_contract
939            .dlc_transactions
940            .refund
941            .compute_txid()
942            == closing_tx.compute_txid()
943        {
944            let refunded = Contract::Refunded(contract.clone());
945            self.store.update_contract(&refunded)?;
946            return Ok(refunded);
947        }
948
949        let contract = if confirmations < NB_CONFIRMATIONS {
950            Contract::PreClosed(PreClosedContract {
951                signed_contract: contract.clone(),
952                attestations: None, // todo in some cases we can get the attestations from the closing tx
953                signed_cet: closing_tx,
954            })
955        } else {
956            Contract::Closed(ClosedContract {
957                attestations: None, // todo in some cases we can get the attestations from the closing tx
958                pnl: contract.accepted_contract.compute_pnl(&closing_tx),
959                signed_cet: Some(closing_tx),
960                contract_id: contract.accepted_contract.get_contract_id(),
961                temporary_contract_id: contract.accepted_contract.offered_contract.id,
962                counter_party_id: contract.accepted_contract.offered_contract.counter_party,
963            })
964        };
965
966        self.store.update_contract(&contract)?;
967
968        Ok(contract)
969    }
970}
971
972impl<W: Deref, SP: Deref, B: Deref, S: Deref, O: Deref, T: Deref, F: Deref, X: ContractSigner>
973    Manager<W, Arc<CachedContractSignerProvider<SP, X>>, B, S, O, T, F, X>
974where
975    W::Target: Wallet,
976    SP::Target: ContractSignerProvider<Signer = X>,
977    B::Target: Blockchain,
978    S::Target: Storage,
979    O::Target: Oracle,
980    T::Target: Time,
981    F::Target: FeeEstimator,
982{
983    /// Create a new channel offer and return the [`dlc_messages::channel::OfferChannel`]
984    /// message to be sent to the `counter_party`.
985    pub async fn offer_channel(
986        &self,
987        contract_input: &ContractInput,
988        counter_party: PublicKey,
989    ) -> Result<OfferChannel, Error> {
990        let oracle_announcements = self.oracle_announcements(contract_input).await?;
991
992        let (offered_channel, offered_contract) = crate::channel_updater::offer_channel(
993            &self.secp,
994            contract_input,
995            &counter_party,
996            &oracle_announcements,
997            CET_NSEQUENCE,
998            REFUND_DELAY,
999            &self.wallet,
1000            &self.signer_provider,
1001            &self.blockchain,
1002            &self.time,
1003            crate::utils::get_new_temporary_id(),
1004        )
1005        .await?;
1006
1007        let msg = offered_channel.get_offer_channel_msg(&offered_contract);
1008
1009        self.store.upsert_channel(
1010            Channel::Offered(offered_channel),
1011            Some(Contract::Offered(offered_contract)),
1012        )?;
1013
1014        Ok(msg)
1015    }
1016
1017    /// Reject a channel that was offered. Returns the [`dlc_messages::channel::Reject`]
1018    /// message to be sent as well as the public key of the offering node.
1019    pub fn reject_channel(&self, channel_id: &ChannelId) -> Result<(Reject, PublicKey), Error> {
1020        let offered_channel =
1021            get_channel_in_state!(self, channel_id, Offered, None as Option<PublicKey>)?;
1022
1023        if offered_channel.is_offer_party {
1024            return Err(Error::InvalidState(
1025                "Cannot reject channel initiated by us.".to_string(),
1026            ));
1027        }
1028
1029        let offered_contract = get_contract_in_state!(
1030            self,
1031            &offered_channel.offered_contract_id,
1032            Offered,
1033            None as Option<PublicKey>
1034        )?;
1035
1036        let counterparty = offered_channel.counter_party;
1037        self.store.upsert_channel(
1038            Channel::Cancelled(offered_channel),
1039            Some(Contract::Rejected(offered_contract)),
1040        )?;
1041
1042        let msg = Reject {
1043            channel_id: *channel_id,
1044        };
1045        Ok((msg, counterparty))
1046    }
1047
1048    /// Accept a channel that was offered. Returns the [`dlc_messages::channel::AcceptChannel`]
1049    /// message to be sent, the updated [`crate::ChannelId`] and [`crate::ContractId`],
1050    /// as well as the public key of the offering node.
1051    pub async fn accept_channel(
1052        &self,
1053        channel_id: &ChannelId,
1054    ) -> Result<(AcceptChannel, ChannelId, ContractId, PublicKey), Error> {
1055        let offered_channel =
1056            get_channel_in_state!(self, channel_id, Offered, None as Option<PublicKey>)?;
1057
1058        if offered_channel.is_offer_party {
1059            return Err(Error::InvalidState(
1060                "Cannot accept channel initiated by us.".to_string(),
1061            ));
1062        }
1063
1064        let offered_contract = get_contract_in_state!(
1065            self,
1066            &offered_channel.offered_contract_id,
1067            Offered,
1068            None as Option<PublicKey>
1069        )?;
1070
1071        let (accepted_channel, accepted_contract, accept_channel) =
1072            crate::channel_updater::accept_channel_offer(
1073                &self.secp,
1074                &offered_channel,
1075                &offered_contract,
1076                &self.wallet,
1077                &self.signer_provider,
1078                &self.blockchain,
1079            )
1080            .await?;
1081
1082        self.wallet.import_address(&Address::p2wsh(
1083            &accepted_contract.dlc_transactions.funding_script_pubkey,
1084            self.blockchain.get_network()?,
1085        ))?;
1086
1087        let channel_id = accepted_channel.channel_id;
1088        let contract_id = accepted_contract.get_contract_id();
1089        let counter_party = accepted_contract.offered_contract.counter_party;
1090
1091        self.store.upsert_channel(
1092            Channel::Accepted(accepted_channel),
1093            Some(Contract::Accepted(accepted_contract)),
1094        )?;
1095
1096        Ok((accept_channel, channel_id, contract_id, counter_party))
1097    }
1098
1099    /// Force close the channel with given [`crate::ChannelId`].
1100    pub async fn force_close_channel(&self, channel_id: &ChannelId) -> Result<(), Error> {
1101        let channel = get_channel_in_state!(self, channel_id, Signed, None as Option<PublicKey>)?;
1102
1103        self.force_close_channel_internal(channel, true).await
1104    }
1105
1106    /// Offer to settle the balance of a channel so that the counter party gets
1107    /// `counter_payout`. Returns the [`dlc_messages::channel::SettleChannelOffer`]
1108    /// message to be sent and the public key of the counter party node.
1109    pub fn settle_offer(
1110        &self,
1111        channel_id: &ChannelId,
1112        counter_payout: u64,
1113    ) -> Result<(SettleOffer, PublicKey), Error> {
1114        let mut signed_channel =
1115            get_channel_in_state!(self, channel_id, Signed, None as Option<PublicKey>)?;
1116
1117        let msg = crate::channel_updater::settle_channel_offer(
1118            &self.secp,
1119            &mut signed_channel,
1120            counter_payout,
1121            PEER_TIMEOUT,
1122            &self.signer_provider,
1123            &self.time,
1124        )?;
1125
1126        let counter_party = signed_channel.counter_party;
1127
1128        self.store
1129            .upsert_channel(Channel::Signed(signed_channel), None)?;
1130
1131        Ok((msg, counter_party))
1132    }
1133
1134    /// Accept a settlement offer, returning the [`SettleAccept`] message to be
1135    /// sent to the node with the returned [`PublicKey`] id.
1136    pub fn accept_settle_offer(
1137        &self,
1138        channel_id: &ChannelId,
1139    ) -> Result<(SettleAccept, PublicKey), Error> {
1140        let mut signed_channel =
1141            get_channel_in_state!(self, channel_id, Signed, None as Option<PublicKey>)?;
1142
1143        let msg = crate::channel_updater::settle_channel_accept(
1144            &self.secp,
1145            &mut signed_channel,
1146            CET_NSEQUENCE,
1147            0,
1148            PEER_TIMEOUT,
1149            &self.signer_provider,
1150            &self.time,
1151            &self.chain_monitor,
1152        )?;
1153
1154        let counter_party = signed_channel.counter_party;
1155
1156        self.store
1157            .upsert_channel(Channel::Signed(signed_channel), None)?;
1158
1159        Ok((msg, counter_party))
1160    }
1161
1162    /// Returns a [`RenewOffer`] message as well as the [`PublicKey`] of the
1163    /// counter party's node to offer the establishment of a new contract in the
1164    /// channel.
1165    pub async fn renew_offer(
1166        &self,
1167        channel_id: &ChannelId,
1168        counter_payout: u64,
1169        contract_input: &ContractInput,
1170    ) -> Result<(RenewOffer, PublicKey), Error> {
1171        let mut signed_channel =
1172            get_channel_in_state!(self, channel_id, Signed, None as Option<PublicKey>)?;
1173
1174        let oracle_announcements = self.oracle_announcements(contract_input).await?;
1175
1176        let (msg, offered_contract) = crate::channel_updater::renew_offer(
1177            &self.secp,
1178            &mut signed_channel,
1179            contract_input,
1180            oracle_announcements,
1181            counter_payout,
1182            REFUND_DELAY,
1183            PEER_TIMEOUT,
1184            CET_NSEQUENCE,
1185            &self.signer_provider,
1186            &self.time,
1187        )?;
1188
1189        let counter_party = offered_contract.counter_party;
1190
1191        self.store.upsert_channel(
1192            Channel::Signed(signed_channel),
1193            Some(Contract::Offered(offered_contract)),
1194        )?;
1195
1196        Ok((msg, counter_party))
1197    }
1198
1199    /// Accept an offer to renew the contract in the channel. Returns the
1200    /// [`RenewAccept`] message to be sent to the peer with the returned
1201    /// [`PublicKey`] as node id.
1202    pub fn accept_renew_offer(
1203        &self,
1204        channel_id: &ChannelId,
1205    ) -> Result<(RenewAccept, PublicKey), Error> {
1206        let mut signed_channel =
1207            get_channel_in_state!(self, channel_id, Signed, None as Option<PublicKey>)?;
1208        let offered_contract_id = signed_channel.get_contract_id().ok_or_else(|| {
1209            Error::InvalidState("Expected to have a contract id but did not.".to_string())
1210        })?;
1211
1212        let offered_contract = get_contract_in_state!(
1213            self,
1214            &offered_contract_id,
1215            Offered,
1216            None as Option<PublicKey>
1217        )?;
1218
1219        let (accepted_contract, msg) = crate::channel_updater::accept_channel_renewal(
1220            &self.secp,
1221            &mut signed_channel,
1222            &offered_contract,
1223            CET_NSEQUENCE,
1224            PEER_TIMEOUT,
1225            &self.signer_provider,
1226            &self.time,
1227        )?;
1228
1229        let counter_party = signed_channel.counter_party;
1230
1231        self.store.upsert_channel(
1232            Channel::Signed(signed_channel),
1233            Some(Contract::Accepted(accepted_contract)),
1234        )?;
1235
1236        Ok((msg, counter_party))
1237    }
1238
1239    /// Reject an offer to renew the contract in the channel. Returns the
1240    /// [`Reject`] message to be sent to the peer with the returned
1241    /// [`PublicKey`] node id.
1242    pub fn reject_renew_offer(&self, channel_id: &ChannelId) -> Result<(Reject, PublicKey), Error> {
1243        let mut signed_channel =
1244            get_channel_in_state!(self, channel_id, Signed, None as Option<PublicKey>)?;
1245        let offered_contract_id = signed_channel.get_contract_id().ok_or_else(|| {
1246            Error::InvalidState(
1247                "Expected to be in a state with an associated contract id but was not.".to_string(),
1248            )
1249        })?;
1250
1251        let offered_contract = get_contract_in_state!(
1252            self,
1253            &offered_contract_id,
1254            Offered,
1255            None as Option<PublicKey>
1256        )?;
1257
1258        let reject_msg = crate::channel_updater::reject_renew_offer(&mut signed_channel)?;
1259
1260        let counter_party = signed_channel.counter_party;
1261
1262        self.store.upsert_channel(
1263            Channel::Signed(signed_channel),
1264            Some(Contract::Rejected(offered_contract)),
1265        )?;
1266
1267        Ok((reject_msg, counter_party))
1268    }
1269
1270    /// Returns a [`Reject`] message to be sent to the counter party of the
1271    /// channel to inform them that the local party does not wish to accept the
1272    /// proposed settle offer.
1273    pub fn reject_settle_offer(
1274        &self,
1275        channel_id: &ChannelId,
1276    ) -> Result<(Reject, PublicKey), Error> {
1277        let mut signed_channel =
1278            get_channel_in_state!(self, channel_id, Signed, None as Option<PublicKey>)?;
1279
1280        let msg = crate::channel_updater::reject_settle_offer(&mut signed_channel)?;
1281
1282        let counter_party = signed_channel.counter_party;
1283
1284        self.store
1285            .upsert_channel(Channel::Signed(signed_channel), None)?;
1286
1287        Ok((msg, counter_party))
1288    }
1289
1290    /// Returns a [`CollaborativeCloseOffer`] message to be sent to the counter
1291    /// party of the channel and update the state of the channel. Note that the
1292    /// channel will be forced closed after a timeout if the counter party does
1293    /// not broadcast the close transaction.
1294    pub fn offer_collaborative_close(
1295        &self,
1296        channel_id: &ChannelId,
1297        counter_payout: u64,
1298    ) -> Result<CollaborativeCloseOffer, Error> {
1299        let mut signed_channel =
1300            get_channel_in_state!(self, channel_id, Signed, None as Option<PublicKey>)?;
1301
1302        let (msg, close_tx) = crate::channel_updater::offer_collaborative_close(
1303            &self.secp,
1304            &mut signed_channel,
1305            counter_payout,
1306            &self.signer_provider,
1307            &self.time,
1308        )?;
1309
1310        self.chain_monitor.lock().unwrap().add_tx(
1311            close_tx.compute_txid(),
1312            ChannelInfo {
1313                channel_id: *channel_id,
1314                tx_type: TxType::CollaborativeClose,
1315            },
1316        );
1317
1318        self.store
1319            .upsert_channel(Channel::Signed(signed_channel), None)?;
1320        self.store
1321            .persist_chain_monitor(&self.chain_monitor.lock().unwrap())?;
1322
1323        Ok(msg)
1324    }
1325
1326    /// Accept an offer to collaboratively close the channel. The close transaction
1327    /// will be broadcast and the state of the channel updated.
1328    pub async fn accept_collaborative_close(&self, channel_id: &ChannelId) -> Result<(), Error> {
1329        let signed_channel =
1330            get_channel_in_state!(self, channel_id, Signed, None as Option<PublicKey>)?;
1331
1332        let closed_contract = if let Some(SignedChannelState::Established {
1333            signed_contract_id,
1334            ..
1335        }) = &signed_channel.roll_back_state
1336        {
1337            let counter_payout = get_signed_channel_state!(
1338                signed_channel,
1339                CollaborativeCloseOffered,
1340                counter_payout
1341            )?;
1342            Some(self.get_collaboratively_closed_contract(
1343                signed_contract_id,
1344                *counter_payout,
1345                true,
1346            )?)
1347        } else {
1348            None
1349        };
1350
1351        let (close_tx, closed_channel) = crate::channel_updater::accept_collaborative_close_offer(
1352            &self.secp,
1353            &signed_channel,
1354            &self.signer_provider,
1355        )?;
1356
1357        self.blockchain.send_transaction(&close_tx).await?;
1358
1359        self.store.upsert_channel(closed_channel, None)?;
1360
1361        if let Some(closed_contract) = closed_contract {
1362            self.store
1363                .update_contract(&Contract::Closed(closed_contract))?;
1364        }
1365
1366        Ok(())
1367    }
1368
1369    async fn try_finalize_closing_established_channel(
1370        &self,
1371        signed_channel: SignedChannel,
1372    ) -> Result<(), Error> {
1373        let (buffer_tx, contract_id, &is_initiator) = get_signed_channel_state!(
1374            signed_channel,
1375            Closing,
1376            buffer_transaction,
1377            contract_id,
1378            is_initiator
1379        )?;
1380
1381        if self
1382            .blockchain
1383            .get_transaction_confirmations(&buffer_tx.compute_txid())
1384            .await?
1385            >= CET_NSEQUENCE
1386        {
1387            tracing::info!(
1388                "Buffer transaction for contract {} has enough confirmations to spend from it",
1389                serialize_hex(&contract_id)
1390            );
1391
1392            let confirmed_contract =
1393                get_contract_in_state!(self, &contract_id, Confirmed, None as Option<PublicKey>)?;
1394
1395            let (contract_info, adaptor_info, attestations) = self
1396                .get_closable_contract_info(&confirmed_contract)
1397                .await
1398                .ok_or_else(|| {
1399                    Error::InvalidState("Could not get information to close contract".to_string())
1400                })?;
1401
1402            let (signed_cet, closed_channel) =
1403                crate::channel_updater::finalize_unilateral_close_settled_channel(
1404                    &self.secp,
1405                    &signed_channel,
1406                    &confirmed_contract,
1407                    contract_info,
1408                    &attestations,
1409                    adaptor_info,
1410                    &self.signer_provider,
1411                    is_initiator,
1412                )?;
1413
1414            let closed_contract = self
1415                .close_contract(
1416                    &confirmed_contract,
1417                    signed_cet,
1418                    attestations.iter().map(|x| &x.1).cloned().collect(),
1419                )
1420                .await?;
1421
1422            self.chain_monitor
1423                .lock()
1424                .unwrap()
1425                .cleanup_channel(signed_channel.channel_id);
1426
1427            self.store
1428                .upsert_channel(closed_channel, Some(closed_contract))?;
1429        }
1430
1431        Ok(())
1432    }
1433
1434    fn on_offer_channel(
1435        &self,
1436        offer_channel: &OfferChannel,
1437        counter_party: PublicKey,
1438    ) -> Result<(), Error> {
1439        offer_channel.validate(
1440            &self.secp,
1441            REFUND_DELAY,
1442            REFUND_DELAY * 2,
1443            CET_NSEQUENCE,
1444            CET_NSEQUENCE * 2,
1445        )?;
1446
1447        let keys_id = self
1448            .signer_provider
1449            .derive_signer_key_id(false, offer_channel.temporary_contract_id);
1450        let (channel, contract) =
1451            OfferedChannel::from_offer_channel(offer_channel, counter_party, keys_id)?;
1452
1453        contract.validate()?;
1454
1455        if self
1456            .store
1457            .get_channel(&channel.temporary_channel_id)?
1458            .is_some()
1459        {
1460            return Err(Error::InvalidParameters(
1461                "Channel with identical idea already in store".to_string(),
1462            ));
1463        }
1464
1465        self.store
1466            .upsert_channel(Channel::Offered(channel), Some(Contract::Offered(contract)))?;
1467
1468        Ok(())
1469    }
1470
1471    fn on_accept_channel(
1472        &self,
1473        accept_channel: &AcceptChannel,
1474        peer_id: &PublicKey,
1475    ) -> Result<SignChannel, Error> {
1476        let offered_channel = get_channel_in_state!(
1477            self,
1478            &accept_channel.temporary_channel_id,
1479            Offered,
1480            Some(*peer_id)
1481        )?;
1482        let offered_contract = get_contract_in_state!(
1483            self,
1484            &offered_channel.offered_contract_id,
1485            Offered,
1486            Some(*peer_id)
1487        )?;
1488
1489        let (signed_channel, signed_contract, sign_channel) = {
1490            let res = crate::channel_updater::verify_and_sign_accepted_channel(
1491                &self.secp,
1492                &offered_channel,
1493                &offered_contract,
1494                accept_channel,
1495                //TODO(tibo): this should be parameterizable.
1496                CET_NSEQUENCE,
1497                &self.wallet,
1498                &self.signer_provider,
1499                &self.chain_monitor,
1500            );
1501
1502            match res {
1503                Ok(res) => res,
1504                Err(e) => {
1505                    let channel = crate::channel::FailedAccept {
1506                        temporary_channel_id: accept_channel.temporary_channel_id,
1507                        error_message: format!("Error validating accept channel: {}", e),
1508                        accept_message: accept_channel.clone(),
1509                        counter_party: *peer_id,
1510                    };
1511                    self.store
1512                        .upsert_channel(Channel::FailedAccept(channel), None)?;
1513                    return Err(e);
1514                }
1515            }
1516        };
1517
1518        self.wallet.import_address(&Address::p2wsh(
1519            &signed_contract
1520                .accepted_contract
1521                .dlc_transactions
1522                .funding_script_pubkey,
1523            self.blockchain.get_network()?,
1524        ))?;
1525
1526        if let SignedChannelState::Established {
1527            buffer_transaction, ..
1528        } = &signed_channel.state
1529        {
1530            self.chain_monitor.lock().unwrap().add_tx(
1531                buffer_transaction.compute_txid(),
1532                ChannelInfo {
1533                    channel_id: signed_channel.channel_id,
1534                    tx_type: TxType::BufferTx,
1535                },
1536            );
1537        } else {
1538            unreachable!();
1539        }
1540
1541        self.store.upsert_channel(
1542            Channel::Signed(signed_channel),
1543            Some(Contract::Signed(signed_contract)),
1544        )?;
1545
1546        self.store
1547            .persist_chain_monitor(&self.chain_monitor.lock().unwrap())?;
1548
1549        Ok(sign_channel)
1550    }
1551
1552    async fn on_sign_channel(
1553        &self,
1554        sign_channel: &SignChannel,
1555        peer_id: &PublicKey,
1556    ) -> Result<(), Error> {
1557        let accepted_channel =
1558            get_channel_in_state!(self, &sign_channel.channel_id, Accepted, Some(*peer_id))?;
1559        let accepted_contract = get_contract_in_state!(
1560            self,
1561            &accepted_channel.accepted_contract_id,
1562            Accepted,
1563            Some(*peer_id)
1564        )?;
1565
1566        let (signed_channel, signed_contract, signed_fund_tx) = {
1567            let res = verify_signed_channel(
1568                &self.secp,
1569                &accepted_channel,
1570                &accepted_contract,
1571                sign_channel,
1572                &self.wallet,
1573                &self.chain_monitor,
1574            );
1575
1576            match res {
1577                Ok(res) => res,
1578                Err(e) => {
1579                    let channel = crate::channel::FailedSign {
1580                        channel_id: sign_channel.channel_id,
1581                        error_message: format!("Error validating accept channel: {}", e),
1582                        sign_message: sign_channel.clone(),
1583                        counter_party: *peer_id,
1584                    };
1585                    self.store
1586                        .upsert_channel(Channel::FailedSign(channel), None)?;
1587                    return Err(e);
1588                }
1589            }
1590        };
1591
1592        if let SignedChannelState::Established {
1593            buffer_transaction, ..
1594        } = &signed_channel.state
1595        {
1596            self.chain_monitor.lock().unwrap().add_tx(
1597                buffer_transaction.compute_txid(),
1598                ChannelInfo {
1599                    channel_id: signed_channel.channel_id,
1600                    tx_type: TxType::BufferTx,
1601                },
1602            );
1603        } else {
1604            unreachable!();
1605        }
1606
1607        self.blockchain.send_transaction(&signed_fund_tx).await?;
1608
1609        self.store.upsert_channel(
1610            Channel::Signed(signed_channel),
1611            Some(Contract::Signed(signed_contract)),
1612        )?;
1613        self.store
1614            .persist_chain_monitor(&self.chain_monitor.lock().unwrap())?;
1615
1616        Ok(())
1617    }
1618
1619    fn on_settle_offer(
1620        &self,
1621        settle_offer: &SettleOffer,
1622        peer_id: &PublicKey,
1623    ) -> Result<Option<Reject>, Error> {
1624        let mut signed_channel =
1625            get_channel_in_state!(self, &settle_offer.channel_id, Signed, Some(*peer_id))?;
1626
1627        if let SignedChannelState::SettledOffered { .. } = signed_channel.state {
1628            return Ok(Some(Reject {
1629                channel_id: settle_offer.channel_id,
1630            }));
1631        }
1632
1633        crate::channel_updater::on_settle_offer(&mut signed_channel, settle_offer)?;
1634
1635        self.store
1636            .upsert_channel(Channel::Signed(signed_channel), None)?;
1637
1638        Ok(None)
1639    }
1640
1641    fn on_settle_accept(
1642        &self,
1643        settle_accept: &SettleAccept,
1644        peer_id: &PublicKey,
1645    ) -> Result<SettleConfirm, Error> {
1646        let mut signed_channel =
1647            get_channel_in_state!(self, &settle_accept.channel_id, Signed, Some(*peer_id))?;
1648
1649        let msg = crate::channel_updater::settle_channel_confirm(
1650            &self.secp,
1651            &mut signed_channel,
1652            settle_accept,
1653            CET_NSEQUENCE,
1654            0,
1655            PEER_TIMEOUT,
1656            &self.signer_provider,
1657            &self.time,
1658            &self.chain_monitor,
1659        )?;
1660
1661        self.store
1662            .upsert_channel(Channel::Signed(signed_channel), None)?;
1663
1664        Ok(msg)
1665    }
1666
1667    fn on_settle_confirm(
1668        &self,
1669        settle_confirm: &SettleConfirm,
1670        peer_id: &PublicKey,
1671    ) -> Result<SettleFinalize, Error> {
1672        let mut signed_channel =
1673            get_channel_in_state!(self, &settle_confirm.channel_id, Signed, Some(*peer_id))?;
1674        let &own_payout = get_signed_channel_state!(signed_channel, SettledAccepted, own_payout)?;
1675        let (prev_buffer_tx, own_buffer_adaptor_signature, is_offer, signed_contract_id) = get_signed_channel_rollback_state!(
1676            signed_channel,
1677            Established,
1678            buffer_transaction,
1679            own_buffer_adaptor_signature,
1680            is_offer,
1681            signed_contract_id
1682        )?;
1683
1684        let prev_buffer_txid = prev_buffer_tx.compute_txid();
1685        let own_buffer_adaptor_signature = *own_buffer_adaptor_signature;
1686        let is_offer = *is_offer;
1687        let signed_contract_id = *signed_contract_id;
1688
1689        let msg = crate::channel_updater::settle_channel_finalize(
1690            &self.secp,
1691            &mut signed_channel,
1692            settle_confirm,
1693            &self.signer_provider,
1694        )?;
1695
1696        self.chain_monitor.lock().unwrap().add_tx(
1697            prev_buffer_txid,
1698            ChannelInfo {
1699                channel_id: signed_channel.channel_id,
1700                tx_type: TxType::Revoked {
1701                    update_idx: signed_channel.update_idx + 1,
1702                    own_adaptor_signature: own_buffer_adaptor_signature,
1703                    is_offer,
1704                    revoked_tx_type: RevokedTxType::Buffer,
1705                },
1706            },
1707        );
1708
1709        let closed_contract = Contract::Closed(self.get_collaboratively_closed_contract(
1710            &signed_contract_id,
1711            own_payout,
1712            true,
1713        )?);
1714
1715        self.store
1716            .upsert_channel(Channel::Signed(signed_channel), Some(closed_contract))?;
1717        self.store
1718            .persist_chain_monitor(&self.chain_monitor.lock().unwrap())?;
1719
1720        Ok(msg)
1721    }
1722
1723    fn on_settle_finalize(
1724        &self,
1725        settle_finalize: &SettleFinalize,
1726        peer_id: &PublicKey,
1727    ) -> Result<(), Error> {
1728        let mut signed_channel =
1729            get_channel_in_state!(self, &settle_finalize.channel_id, Signed, Some(*peer_id))?;
1730        let &own_payout = get_signed_channel_state!(signed_channel, SettledConfirmed, own_payout)?;
1731        let (buffer_tx, own_buffer_adaptor_signature, is_offer, signed_contract_id) = get_signed_channel_rollback_state!(
1732            signed_channel,
1733            Established,
1734            buffer_transaction,
1735            own_buffer_adaptor_signature,
1736            is_offer,
1737            signed_contract_id
1738        )?;
1739
1740        let own_buffer_adaptor_signature = *own_buffer_adaptor_signature;
1741        let is_offer = *is_offer;
1742        let buffer_txid = buffer_tx.compute_txid();
1743        let signed_contract_id = *signed_contract_id;
1744
1745        crate::channel_updater::settle_channel_on_finalize(
1746            &self.secp,
1747            &mut signed_channel,
1748            settle_finalize,
1749        )?;
1750
1751        self.chain_monitor.lock().unwrap().add_tx(
1752            buffer_txid,
1753            ChannelInfo {
1754                channel_id: signed_channel.channel_id,
1755                tx_type: TxType::Revoked {
1756                    update_idx: signed_channel.update_idx + 1,
1757                    own_adaptor_signature: own_buffer_adaptor_signature,
1758                    is_offer,
1759                    revoked_tx_type: RevokedTxType::Buffer,
1760                },
1761            },
1762        );
1763
1764        let closed_contract = Contract::Closed(self.get_collaboratively_closed_contract(
1765            &signed_contract_id,
1766            own_payout,
1767            true,
1768        )?);
1769        self.store
1770            .upsert_channel(Channel::Signed(signed_channel), Some(closed_contract))?;
1771        self.store
1772            .persist_chain_monitor(&self.chain_monitor.lock().unwrap())?;
1773
1774        Ok(())
1775    }
1776
1777    fn on_renew_offer(
1778        &self,
1779        renew_offer: &RenewOffer,
1780        peer_id: &PublicKey,
1781    ) -> Result<Option<Reject>, Error> {
1782        let mut signed_channel =
1783            get_channel_in_state!(self, &renew_offer.channel_id, Signed, Some(*peer_id))?;
1784
1785        // Received a renew offer when we already sent one, we reject it.
1786        if let SignedChannelState::RenewOffered { is_offer, .. } = signed_channel.state {
1787            if is_offer {
1788                return Ok(Some(Reject {
1789                    channel_id: renew_offer.channel_id,
1790                }));
1791            }
1792        }
1793
1794        let offered_contract = crate::channel_updater::on_renew_offer(
1795            &mut signed_channel,
1796            renew_offer,
1797            PEER_TIMEOUT,
1798            &self.time,
1799        )?;
1800
1801        self.store.create_contract(&offered_contract)?;
1802        self.store
1803            .upsert_channel(Channel::Signed(signed_channel), None)?;
1804
1805        Ok(None)
1806    }
1807
1808    fn on_renew_accept(
1809        &self,
1810        renew_accept: &RenewAccept,
1811        peer_id: &PublicKey,
1812    ) -> Result<RenewConfirm, Error> {
1813        let mut signed_channel =
1814            get_channel_in_state!(self, &renew_accept.channel_id, Signed, Some(*peer_id))?;
1815        let offered_contract_id = signed_channel.get_contract_id().ok_or_else(|| {
1816            Error::InvalidState(
1817                "Expected to be in a state with an associated contract id but was not.".to_string(),
1818            )
1819        })?;
1820
1821        let offered_contract =
1822            get_contract_in_state!(self, &offered_contract_id, Offered, Some(*peer_id))?;
1823
1824        let (signed_contract, msg) = crate::channel_updater::verify_renew_accept_and_confirm(
1825            &self.secp,
1826            renew_accept,
1827            &mut signed_channel,
1828            &offered_contract,
1829            CET_NSEQUENCE,
1830            PEER_TIMEOUT,
1831            &self.wallet,
1832            &self.signer_provider,
1833            &self.time,
1834        )?;
1835
1836        // Directly confirmed as we're in a channel the fund tx is already confirmed.
1837        self.store.upsert_channel(
1838            Channel::Signed(signed_channel),
1839            Some(Contract::Confirmed(signed_contract)),
1840        )?;
1841
1842        Ok(msg)
1843    }
1844
1845    fn on_renew_confirm(
1846        &self,
1847        renew_confirm: &RenewConfirm,
1848        peer_id: &PublicKey,
1849    ) -> Result<RenewFinalize, Error> {
1850        let mut signed_channel =
1851            get_channel_in_state!(self, &renew_confirm.channel_id, Signed, Some(*peer_id))?;
1852        let own_payout = get_signed_channel_state!(signed_channel, RenewAccepted, own_payout)?;
1853        let contract_id = signed_channel.get_contract_id().ok_or_else(|| {
1854            Error::InvalidState(
1855                "Expected to be in a state with an associated contract id but was not.".to_string(),
1856            )
1857        })?;
1858
1859        let (tx_type, prev_tx_id, closed_contract) = match signed_channel
1860            .roll_back_state
1861            .as_ref()
1862            .expect("to have a rollback state")
1863        {
1864            SignedChannelState::Established {
1865                own_buffer_adaptor_signature,
1866                buffer_transaction,
1867                signed_contract_id,
1868                ..
1869            } => {
1870                let closed_contract = Contract::Closed(self.get_collaboratively_closed_contract(
1871                    signed_contract_id,
1872                    *own_payout,
1873                    true,
1874                )?);
1875                (
1876                    TxType::Revoked {
1877                        update_idx: signed_channel.update_idx,
1878                        own_adaptor_signature: *own_buffer_adaptor_signature,
1879                        is_offer: false,
1880                        revoked_tx_type: RevokedTxType::Buffer,
1881                    },
1882                    buffer_transaction.compute_txid(),
1883                    Some(closed_contract),
1884                )
1885            }
1886            SignedChannelState::Settled {
1887                settle_tx,
1888                own_settle_adaptor_signature,
1889                ..
1890            } => (
1891                TxType::Revoked {
1892                    update_idx: signed_channel.update_idx,
1893                    own_adaptor_signature: *own_settle_adaptor_signature,
1894                    is_offer: false,
1895                    revoked_tx_type: RevokedTxType::Settle,
1896                },
1897                settle_tx.compute_txid(),
1898                None,
1899            ),
1900            s => {
1901                return Err(Error::InvalidState(format!(
1902                    "Expected rollback state Established or Revoked but found {s:?}"
1903                )))
1904            }
1905        };
1906        let accepted_contract =
1907            get_contract_in_state!(self, &contract_id, Accepted, Some(*peer_id))?;
1908
1909        let (signed_contract, msg) = crate::channel_updater::verify_renew_confirm_and_finalize(
1910            &self.secp,
1911            &mut signed_channel,
1912            &accepted_contract,
1913            renew_confirm,
1914            PEER_TIMEOUT,
1915            &self.time,
1916            &self.wallet,
1917            &self.signer_provider,
1918            &self.chain_monitor,
1919        )?;
1920
1921        self.chain_monitor.lock().unwrap().add_tx(
1922            prev_tx_id,
1923            ChannelInfo {
1924                channel_id: signed_channel.channel_id,
1925                tx_type,
1926            },
1927        );
1928
1929        // Directly confirmed as we're in a channel the fund tx is already confirmed.
1930        self.store.upsert_channel(
1931            Channel::Signed(signed_channel),
1932            Some(Contract::Confirmed(signed_contract)),
1933        )?;
1934
1935        self.store
1936            .persist_chain_monitor(&self.chain_monitor.lock().unwrap())?;
1937
1938        if let Some(closed_contract) = closed_contract {
1939            self.store.update_contract(&closed_contract)?;
1940        }
1941
1942        Ok(msg)
1943    }
1944
1945    fn on_renew_finalize(
1946        &self,
1947        renew_finalize: &RenewFinalize,
1948        peer_id: &PublicKey,
1949    ) -> Result<RenewRevoke, Error> {
1950        let mut signed_channel =
1951            get_channel_in_state!(self, &renew_finalize.channel_id, Signed, Some(*peer_id))?;
1952        let own_payout = get_signed_channel_state!(signed_channel, RenewConfirmed, own_payout)?;
1953
1954        let (tx_type, prev_tx_id, closed_contract) = match signed_channel
1955            .roll_back_state
1956            .as_ref()
1957            .expect("to have a rollback state")
1958        {
1959            SignedChannelState::Established {
1960                own_buffer_adaptor_signature,
1961                buffer_transaction,
1962                signed_contract_id,
1963                ..
1964            } => {
1965                let closed_contract = self.get_collaboratively_closed_contract(
1966                    signed_contract_id,
1967                    *own_payout,
1968                    true,
1969                )?;
1970                (
1971                    TxType::Revoked {
1972                        update_idx: signed_channel.update_idx,
1973                        own_adaptor_signature: *own_buffer_adaptor_signature,
1974                        is_offer: false,
1975                        revoked_tx_type: RevokedTxType::Buffer,
1976                    },
1977                    buffer_transaction.compute_txid(),
1978                    Some(Contract::Closed(closed_contract)),
1979                )
1980            }
1981            SignedChannelState::Settled {
1982                settle_tx,
1983                own_settle_adaptor_signature,
1984                ..
1985            } => (
1986                TxType::Revoked {
1987                    update_idx: signed_channel.update_idx,
1988                    own_adaptor_signature: *own_settle_adaptor_signature,
1989                    is_offer: false,
1990                    revoked_tx_type: RevokedTxType::Settle,
1991                },
1992                settle_tx.compute_txid(),
1993                None,
1994            ),
1995            s => {
1996                return Err(Error::InvalidState(format!(
1997                    "Expected rollback state of Established or Settled but was {s:?}"
1998                )))
1999            }
2000        };
2001
2002        let msg = crate::channel_updater::renew_channel_on_finalize(
2003            &self.secp,
2004            &mut signed_channel,
2005            renew_finalize,
2006            &self.signer_provider,
2007        )?;
2008
2009        self.chain_monitor.lock().unwrap().add_tx(
2010            prev_tx_id,
2011            ChannelInfo {
2012                channel_id: signed_channel.channel_id,
2013                tx_type,
2014            },
2015        );
2016
2017        let buffer_tx =
2018            get_signed_channel_state!(signed_channel, Established, ref buffer_transaction)?;
2019
2020        self.chain_monitor.lock().unwrap().add_tx(
2021            buffer_tx.compute_txid(),
2022            ChannelInfo {
2023                channel_id: signed_channel.channel_id,
2024                tx_type: TxType::BufferTx,
2025            },
2026        );
2027
2028        self.store
2029            .upsert_channel(Channel::Signed(signed_channel), None)?;
2030        self.store
2031            .persist_chain_monitor(&self.chain_monitor.lock().unwrap())?;
2032
2033        if let Some(closed_contract) = closed_contract {
2034            self.store.update_contract(&closed_contract)?;
2035        }
2036
2037        Ok(msg)
2038    }
2039
2040    fn on_renew_revoke(
2041        &self,
2042        renew_revoke: &RenewRevoke,
2043        peer_id: &PublicKey,
2044    ) -> Result<(), Error> {
2045        let mut signed_channel =
2046            get_channel_in_state!(self, &renew_revoke.channel_id, Signed, Some(*peer_id))?;
2047
2048        crate::channel_updater::renew_channel_on_revoke(
2049            &self.secp,
2050            &mut signed_channel,
2051            renew_revoke,
2052        )?;
2053
2054        self.store
2055            .upsert_channel(Channel::Signed(signed_channel), None)
2056    }
2057
2058    fn on_collaborative_close_offer(
2059        &self,
2060        close_offer: &CollaborativeCloseOffer,
2061        peer_id: &PublicKey,
2062    ) -> Result<(), Error> {
2063        let mut signed_channel =
2064            get_channel_in_state!(self, &close_offer.channel_id, Signed, Some(*peer_id))?;
2065
2066        crate::channel_updater::on_collaborative_close_offer(
2067            &mut signed_channel,
2068            close_offer,
2069            PEER_TIMEOUT,
2070            &self.time,
2071        )?;
2072
2073        self.store
2074            .upsert_channel(Channel::Signed(signed_channel), None)?;
2075
2076        Ok(())
2077    }
2078
2079    fn on_reject(&self, reject: &Reject, counter_party: &PublicKey) -> Result<(), Error> {
2080        let channel = self.store.get_channel(&reject.channel_id)?;
2081
2082        if let Some(channel) = channel {
2083            if channel.get_counter_party_id() != *counter_party {
2084                return Err(Error::InvalidParameters(format!(
2085                    "Peer {:02x?} is not involved with {} {:02x?}.",
2086                    counter_party,
2087                    stringify!(Channel),
2088                    channel.get_id()
2089                )));
2090            }
2091            match channel {
2092                Channel::Offered(offered_channel) => {
2093                    let offered_contract = get_contract_in_state!(
2094                        self,
2095                        &offered_channel.offered_contract_id,
2096                        Offered,
2097                        None as Option<PublicKey>
2098                    )?;
2099                    let utxos = offered_contract
2100                        .funding_inputs
2101                        .iter()
2102                        .map(|funding_input| {
2103                            let txid = Transaction::consensus_decode(
2104                                &mut funding_input.prev_tx.as_slice(),
2105                            )
2106                            .expect("Transaction Decode Error")
2107                            .compute_txid();
2108                            let vout = funding_input.prev_tx_vout;
2109                            OutPoint { txid, vout }
2110                        })
2111                        .collect::<Vec<_>>();
2112
2113                    self.wallet.unreserve_utxos(&utxos)?;
2114
2115                    // remove rejected channel, since nothing has been confirmed on chain yet.
2116                    self.store.upsert_channel(
2117                        Channel::Cancelled(offered_channel),
2118                        Some(Contract::Rejected(offered_contract)),
2119                    )?;
2120                }
2121                Channel::Signed(mut signed_channel) => {
2122                    let contract = match signed_channel.state {
2123                        SignedChannelState::RenewOffered {
2124                            offered_contract_id,
2125                            ..
2126                        } => {
2127                            let offered_contract = get_contract_in_state!(
2128                                self,
2129                                &offered_contract_id,
2130                                Offered,
2131                                None::<PublicKey>
2132                            )?;
2133                            Some(Contract::Rejected(offered_contract))
2134                        }
2135                        _ => None,
2136                    };
2137
2138                    crate::channel_updater::on_reject(&mut signed_channel)?;
2139
2140                    self.store
2141                        .upsert_channel(Channel::Signed(signed_channel), contract)?;
2142                }
2143                channel => {
2144                    return Err(Error::InvalidState(format!(
2145                        "Not in a state adequate to receive a reject message. {:?}",
2146                        channel
2147                    )))
2148                }
2149            }
2150        } else {
2151            tracing::warn!(
2152                "Couldn't find rejected dlc channel with id: {}",
2153                reject.channel_id.to_lower_hex_string()
2154            );
2155        }
2156
2157        Ok(())
2158    }
2159
2160    async fn channel_checks(&self) -> Result<(), Error> {
2161        let established_closing_channels = self
2162            .store
2163            .get_signed_channels(Some(SignedChannelStateType::Closing))?;
2164
2165        for channel in established_closing_channels {
2166            if let Err(e) = self.try_finalize_closing_established_channel(channel).await {
2167                tracing::error!("Error trying to close established channel: {}", e);
2168            }
2169        }
2170
2171        if let Err(e) = self.check_for_timed_out_channels().await {
2172            tracing::error!("Error checking timed out channels {}", e);
2173        }
2174        self.check_for_watched_tx().await
2175    }
2176
2177    async fn check_for_timed_out_channels(&self) -> Result<(), Error> {
2178        check_for_timed_out_channels!(self, RenewOffered);
2179        check_for_timed_out_channels!(self, RenewAccepted);
2180        check_for_timed_out_channels!(self, RenewConfirmed);
2181        check_for_timed_out_channels!(self, SettledOffered);
2182        check_for_timed_out_channels!(self, SettledAccepted);
2183        check_for_timed_out_channels!(self, SettledConfirmed);
2184
2185        Ok(())
2186    }
2187
2188    pub(crate) async fn process_watched_txs(
2189        &self,
2190        watched_txs: Vec<(Transaction, ChannelInfo)>,
2191    ) -> Result<(), Error> {
2192        for (tx, channel_info) in watched_txs {
2193            let mut signed_channel = match get_channel_in_state!(
2194                self,
2195                &channel_info.channel_id,
2196                Signed,
2197                None as Option<PublicKey>
2198            ) {
2199                Ok(c) => c,
2200                Err(e) => {
2201                    tracing::error!(
2202                        "Could not retrieve channel {:?}: {}",
2203                        channel_info.channel_id,
2204                        e
2205                    );
2206                    continue;
2207                }
2208            };
2209
2210            let persist = match channel_info.tx_type {
2211                TxType::BufferTx => {
2212                    // TODO(tibo): should only considered closed after some confirmations.
2213                    // Ideally should save previous state, and maybe restore in
2214                    // case of reorg, though if the counter party has sent the
2215                    // tx to close the channel it is unlikely that the tx will
2216                    // not be part of a future block.
2217
2218                    let contract_id = signed_channel
2219                        .get_contract_id()
2220                        .expect("to have a contract id");
2221                    let mut state = SignedChannelState::Closing {
2222                        buffer_transaction: tx.clone(),
2223                        is_initiator: false,
2224                        contract_id,
2225                        keys_id: signed_channel.keys_id().ok_or_else(|| {
2226                            Error::InvalidState("Expected to have keys_id.".to_string())
2227                        })?,
2228                    };
2229                    std::mem::swap(&mut signed_channel.state, &mut state);
2230
2231                    signed_channel.roll_back_state = Some(state);
2232
2233                    self.store
2234                        .upsert_channel(Channel::Signed(signed_channel), None)?;
2235
2236                    false
2237                }
2238                TxType::Revoked {
2239                    update_idx,
2240                    own_adaptor_signature,
2241                    is_offer,
2242                    revoked_tx_type,
2243                } => {
2244                    let secret = signed_channel
2245                        .counter_party_commitment_secrets
2246                        .get_secret(update_idx)
2247                        .expect("to be able to retrieve the per update secret");
2248                    let counter_per_update_secret = SecretKey::from_slice(&secret)
2249                        .expect("to be able to parse the counter per update secret.");
2250
2251                    let per_update_seed_pk = signed_channel.own_per_update_seed;
2252
2253                    let per_update_seed_sk = self
2254                        .signer_provider
2255                        .get_secret_key_for_pubkey(&per_update_seed_pk)?;
2256
2257                    let per_update_secret = SecretKey::from_slice(&build_commitment_secret(
2258                        per_update_seed_sk.as_ref(),
2259                        update_idx,
2260                    ))
2261                    .expect("a valid secret key.");
2262
2263                    let per_update_point =
2264                        PublicKey::from_secret_key(&self.secp, &per_update_secret);
2265
2266                    let own_revocation_params = signed_channel.own_points.get_revokable_params(
2267                        &self.secp,
2268                        &signed_channel.counter_points.revocation_basepoint,
2269                        &per_update_point,
2270                    );
2271
2272                    let counter_per_update_point =
2273                        PublicKey::from_secret_key(&self.secp, &counter_per_update_secret);
2274
2275                    let base_own_sk = self
2276                        .signer_provider
2277                        .get_secret_key_for_pubkey(&signed_channel.own_points.own_basepoint)?;
2278
2279                    let own_sk = derive_private_key(&self.secp, &per_update_point, &base_own_sk);
2280
2281                    let counter_revocation_params =
2282                        signed_channel.counter_points.get_revokable_params(
2283                            &self.secp,
2284                            &signed_channel.own_points.revocation_basepoint,
2285                            &counter_per_update_point,
2286                        );
2287
2288                    let witness = if signed_channel.own_params.fund_pubkey
2289                        < signed_channel.counter_params.fund_pubkey
2290                    {
2291                        tx.input[0].witness.to_vec().remove(1)
2292                    } else {
2293                        tx.input[0].witness.to_vec().remove(2)
2294                    };
2295
2296                    let sig_data = witness
2297                        .iter()
2298                        .take(witness.len() - 1)
2299                        .cloned()
2300                        .collect::<Vec<_>>();
2301                    let own_sig = Signature::from_der(&sig_data)?;
2302
2303                    let counter_sk = own_adaptor_signature.recover(
2304                        &self.secp,
2305                        &own_sig,
2306                        &counter_revocation_params.publish_pk.inner,
2307                    )?;
2308
2309                    let own_revocation_base_secret =
2310                        &self.signer_provider.get_secret_key_for_pubkey(
2311                            &signed_channel.own_points.revocation_basepoint,
2312                        )?;
2313
2314                    let counter_revocation_sk = derive_private_revocation_key(
2315                        &self.secp,
2316                        &counter_per_update_secret,
2317                        own_revocation_base_secret,
2318                    );
2319
2320                    let (offer_params, accept_params) = if is_offer {
2321                        (&own_revocation_params, &counter_revocation_params)
2322                    } else {
2323                        (&counter_revocation_params, &own_revocation_params)
2324                    };
2325
2326                    let fee_rate_per_vb: u64 = (self.fee_estimator.get_est_sat_per_1000_weight(
2327                        lightning::chain::chaininterface::ConfirmationTarget::UrgentOnChainSweep,
2328                    ) / 250)
2329                        .into();
2330
2331                    let signed_tx = match revoked_tx_type {
2332                        RevokedTxType::Buffer => {
2333                            dlc::channel::create_and_sign_punish_buffer_transaction(
2334                                &self.secp,
2335                                offer_params,
2336                                accept_params,
2337                                &own_sk,
2338                                &counter_sk,
2339                                &counter_revocation_sk,
2340                                &tx,
2341                                &self.wallet.get_new_address()?,
2342                                0,
2343                                fee_rate_per_vb,
2344                            )?
2345                        }
2346                        RevokedTxType::Settle => {
2347                            dlc::channel::create_and_sign_punish_settle_transaction(
2348                                &self.secp,
2349                                offer_params,
2350                                accept_params,
2351                                &own_sk,
2352                                &counter_sk,
2353                                &counter_revocation_sk,
2354                                &tx,
2355                                &self.wallet.get_new_address()?,
2356                                CET_NSEQUENCE,
2357                                0,
2358                                fee_rate_per_vb,
2359                                is_offer,
2360                            )?
2361                        }
2362                    };
2363
2364                    self.blockchain.send_transaction(&signed_tx).await?;
2365
2366                    let closed_channel = Channel::ClosedPunished(ClosedPunishedChannel {
2367                        counter_party: signed_channel.counter_party,
2368                        temporary_channel_id: signed_channel.temporary_channel_id,
2369                        channel_id: signed_channel.channel_id,
2370                        punish_txid: signed_tx.compute_txid(),
2371                    });
2372
2373                    //TODO(tibo): should probably make sure the tx is confirmed somewhere before
2374                    //stop watching the cheating tx.
2375                    self.chain_monitor
2376                        .lock()
2377                        .unwrap()
2378                        .cleanup_channel(signed_channel.channel_id);
2379                    self.store.upsert_channel(closed_channel, None)?;
2380                    true
2381                }
2382                TxType::CollaborativeClose => {
2383                    if let Some(SignedChannelState::Established {
2384                        signed_contract_id, ..
2385                    }) = signed_channel.roll_back_state
2386                    {
2387                        let counter_payout = get_signed_channel_state!(
2388                            signed_channel,
2389                            CollaborativeCloseOffered,
2390                            counter_payout
2391                        )?;
2392                        let closed_contract = self.get_collaboratively_closed_contract(
2393                            &signed_contract_id,
2394                            *counter_payout,
2395                            false,
2396                        )?;
2397                        self.store
2398                            .update_contract(&Contract::Closed(closed_contract))?;
2399                    }
2400                    let closed_channel = Channel::CollaborativelyClosed(ClosedChannel {
2401                        counter_party: signed_channel.counter_party,
2402                        temporary_channel_id: signed_channel.temporary_channel_id,
2403                        channel_id: signed_channel.channel_id,
2404                    });
2405                    self.chain_monitor
2406                        .lock()
2407                        .unwrap()
2408                        .cleanup_channel(signed_channel.channel_id);
2409                    self.store.upsert_channel(closed_channel, None)?;
2410                    true
2411                }
2412                TxType::SettleTx => {
2413                    let closed_channel = Channel::CounterClosed(ClosedChannel {
2414                        counter_party: signed_channel.counter_party,
2415                        temporary_channel_id: signed_channel.temporary_channel_id,
2416                        channel_id: signed_channel.channel_id,
2417                    });
2418                    self.chain_monitor
2419                        .lock()
2420                        .unwrap()
2421                        .cleanup_channel(signed_channel.channel_id);
2422                    self.store.upsert_channel(closed_channel, None)?;
2423                    true
2424                }
2425                TxType::Cet => {
2426                    let contract_id = signed_channel.get_contract_id();
2427                    let closed_channel = {
2428                        match &signed_channel.state {
2429                            SignedChannelState::Closing { is_initiator, .. } => {
2430                                if *is_initiator {
2431                                    Channel::Closed(ClosedChannel {
2432                                        counter_party: signed_channel.counter_party,
2433                                        temporary_channel_id: signed_channel.temporary_channel_id,
2434                                        channel_id: signed_channel.channel_id,
2435                                    })
2436                                } else {
2437                                    Channel::CounterClosed(ClosedChannel {
2438                                        counter_party: signed_channel.counter_party,
2439                                        temporary_channel_id: signed_channel.temporary_channel_id,
2440                                        channel_id: signed_channel.channel_id,
2441                                    })
2442                                }
2443                            }
2444                            _ => {
2445                                tracing::error!("Saw spending of buffer transaction without being in closing state");
2446                                Channel::Closed(ClosedChannel {
2447                                    counter_party: signed_channel.counter_party,
2448                                    temporary_channel_id: signed_channel.temporary_channel_id,
2449                                    channel_id: signed_channel.channel_id,
2450                                })
2451                            }
2452                        }
2453                    };
2454
2455                    self.chain_monitor
2456                        .lock()
2457                        .unwrap()
2458                        .cleanup_channel(signed_channel.channel_id);
2459
2460                    let pre_closed_contract = contract_id
2461                        .map(|contract_id| {
2462                            self.store.get_contract(&contract_id).map(|contract| {
2463                                contract.map(|contract| match contract {
2464                                    Contract::Confirmed(signed_contract) => {
2465                                        Some(Contract::PreClosed(PreClosedContract {
2466                                            signed_contract,
2467                                            attestations: None,
2468                                            signed_cet: tx.clone(),
2469                                        }))
2470                                    }
2471                                    _ => None,
2472                                })
2473                            })
2474                        })
2475                        .transpose()?
2476                        .flatten()
2477                        .flatten();
2478
2479                    self.store
2480                        .upsert_channel(closed_channel, pre_closed_contract)?;
2481
2482                    true
2483                }
2484            };
2485
2486            if persist {
2487                self.store
2488                    .persist_chain_monitor(&self.chain_monitor.lock().unwrap())?;
2489            }
2490        }
2491        Ok(())
2492    }
2493
2494    async fn check_for_watched_tx(&self) -> Result<(), Error> {
2495        let confirmed_txs = self.chain_monitor.lock().unwrap().confirmed_txs();
2496
2497        self.process_watched_txs(confirmed_txs).await?;
2498
2499        self.get_store()
2500            .persist_chain_monitor(&self.chain_monitor.lock().unwrap())?;
2501
2502        Ok(())
2503    }
2504
2505    async fn force_close_channel_internal(
2506        &self,
2507        mut channel: SignedChannel,
2508        is_initiator: bool,
2509    ) -> Result<(), Error> {
2510        match &channel.state {
2511            SignedChannelState::Established {
2512                counter_buffer_adaptor_signature,
2513                buffer_transaction,
2514                ..
2515            } => {
2516                let counter_buffer_adaptor_signature = *counter_buffer_adaptor_signature;
2517                let buffer_transaction = buffer_transaction.clone();
2518                self.initiate_unilateral_close_established_channel(
2519                    channel,
2520                    is_initiator,
2521                    counter_buffer_adaptor_signature,
2522                    buffer_transaction,
2523                )
2524                .await
2525            }
2526            SignedChannelState::RenewFinalized {
2527                buffer_transaction,
2528                offer_buffer_adaptor_signature,
2529                ..
2530            } => {
2531                let offer_buffer_adaptor_signature = *offer_buffer_adaptor_signature;
2532                let buffer_transaction = buffer_transaction.clone();
2533                self.initiate_unilateral_close_established_channel(
2534                    channel,
2535                    is_initiator,
2536                    offer_buffer_adaptor_signature,
2537                    buffer_transaction,
2538                )
2539                .await
2540            }
2541            SignedChannelState::Settled { .. } => {
2542                self.close_settled_channel(channel, is_initiator).await
2543            }
2544            SignedChannelState::SettledOffered { .. }
2545            | SignedChannelState::SettledReceived { .. }
2546            | SignedChannelState::SettledAccepted { .. }
2547            | SignedChannelState::SettledConfirmed { .. }
2548            | SignedChannelState::RenewOffered { .. }
2549            | SignedChannelState::RenewAccepted { .. }
2550            | SignedChannelState::RenewConfirmed { .. }
2551            | SignedChannelState::CollaborativeCloseOffered { .. } => {
2552                channel.state = channel
2553                    .roll_back_state
2554                    .take()
2555                    .expect("to have a rollback state");
2556                let channel_clone = channel.clone(); // Clone the channel to avoid moving it
2557                Box::pin(self.force_close_channel_internal(channel_clone, is_initiator)).await
2558            }
2559            SignedChannelState::Closing { .. } => Err(Error::InvalidState(
2560                "Channel is already closing.".to_string(),
2561            )),
2562        }
2563    }
2564
2565    /// Initiate the unilateral closing of a channel that has been established.
2566    async fn initiate_unilateral_close_established_channel(
2567        &self,
2568        mut signed_channel: SignedChannel,
2569        is_initiator: bool,
2570        buffer_adaptor_signature: EcdsaAdaptorSignature,
2571        buffer_transaction: Transaction,
2572    ) -> Result<(), Error> {
2573        let keys_id = signed_channel.keys_id().ok_or_else(|| {
2574            Error::InvalidState("Expected to be in a state with an associated keys id.".to_string())
2575        })?;
2576
2577        crate::channel_updater::initiate_unilateral_close_established_channel(
2578            &self.secp,
2579            &mut signed_channel,
2580            buffer_adaptor_signature,
2581            keys_id,
2582            buffer_transaction,
2583            &self.signer_provider,
2584            is_initiator,
2585        )?;
2586
2587        let buffer_transaction =
2588            get_signed_channel_state!(signed_channel, Closing, ref buffer_transaction)?;
2589
2590        self.blockchain.send_transaction(buffer_transaction).await?;
2591
2592        self.chain_monitor
2593            .lock()
2594            .unwrap()
2595            .remove_tx(&buffer_transaction.compute_txid());
2596
2597        self.store
2598            .upsert_channel(Channel::Signed(signed_channel), None)?;
2599
2600        self.store
2601            .persist_chain_monitor(&self.chain_monitor.lock().unwrap())?;
2602
2603        Ok(())
2604    }
2605
2606    /// Unilaterally close a channel that has been settled.
2607    async fn close_settled_channel(
2608        &self,
2609        signed_channel: SignedChannel,
2610        is_initiator: bool,
2611    ) -> Result<(), Error> {
2612        let (settle_tx, closed_channel) = crate::channel_updater::close_settled_channel(
2613            &self.secp,
2614            &signed_channel,
2615            &self.signer_provider,
2616            is_initiator,
2617        )?;
2618
2619        if self
2620            .blockchain
2621            .get_transaction_confirmations(&settle_tx.compute_txid())
2622            .await
2623            .unwrap_or(0)
2624            == 0
2625        {
2626            self.blockchain.send_transaction(&settle_tx).await?;
2627        }
2628
2629        self.chain_monitor
2630            .lock()
2631            .unwrap()
2632            .cleanup_channel(signed_channel.channel_id);
2633
2634        self.store.upsert_channel(closed_channel, None)?;
2635
2636        Ok(())
2637    }
2638
2639    fn get_collaboratively_closed_contract(
2640        &self,
2641        contract_id: &ContractId,
2642        payout: u64,
2643        is_own_payout: bool,
2644    ) -> Result<ClosedContract, Error> {
2645        let contract = get_contract_in_state!(self, contract_id, Confirmed, None::<PublicKey>)?;
2646        let own_collateral = if contract.accepted_contract.offered_contract.is_offer_party {
2647            contract
2648                .accepted_contract
2649                .offered_contract
2650                .offer_params
2651                .collateral
2652        } else {
2653            contract.accepted_contract.accept_params.collateral
2654        };
2655        let own_payout = if is_own_payout {
2656            payout
2657        } else {
2658            contract.accepted_contract.offered_contract.total_collateral - payout
2659        };
2660        let pnl = own_payout as i64 - own_collateral as i64;
2661        Ok(ClosedContract {
2662            attestations: None,
2663            signed_cet: None,
2664            contract_id: *contract_id,
2665            temporary_contract_id: contract.accepted_contract.offered_contract.id,
2666            counter_party_id: contract.accepted_contract.offered_contract.counter_party,
2667            pnl,
2668        })
2669    }
2670
2671    async fn oracle_announcements(
2672        &self,
2673        contract_input: &ContractInput,
2674    ) -> Result<Vec<Vec<OracleAnnouncement>>, Error> {
2675        let announcements = stream::iter(contract_input.contract_infos.iter())
2676            .map(|x| {
2677                let future = self.get_oracle_announcements(&x.oracles);
2678                async move {
2679                    match future.await {
2680                        Ok(result) => Ok(result),
2681                        Err(e) => {
2682                            tracing::error!("Failed to get oracle announcements: {}", e);
2683                            Err(e)
2684                        }
2685                    }
2686                }
2687            })
2688            .collect::<FuturesUnordered<_>>()
2689            .await
2690            .try_collect::<Vec<_>>()
2691            .await?;
2692        Ok(announcements)
2693    }
2694}