1use super::{
4 Blockchain, CachedContractSignerProvider, ContractSigner, Oracle, Storage, Time, Wallet,
5};
6use crate::chain_monitor::{ChainMonitor, ChannelInfo, RevokedTxType, TxType};
7use crate::channel::offered_channel::OfferedChannel;
8use crate::channel::signed_channel::{SignedChannel, SignedChannelState, SignedChannelStateType};
9use crate::channel::{Channel, ClosedChannel, ClosedPunishedChannel};
10use crate::channel_updater::get_signed_channel_state;
11use crate::channel_updater::verify_signed_channel;
12use crate::contract::{
13 accepted_contract::AcceptedContract, contract_info::ContractInfo,
14 contract_input::ContractInput, contract_input::OracleInput, offered_contract::OfferedContract,
15 signed_contract::SignedContract, AdaptorInfo, ClosedContract, Contract, FailedAcceptContract,
16 FailedSignContract, PreClosedContract,
17};
18use crate::contract_updater::{accept_contract, verify_accepted_and_sign_contract};
19use crate::error::Error;
20use crate::utils::get_object_in_state;
21use crate::{ChannelId, ContractId, ContractSignerProvider};
22use bitcoin::absolute::Height;
23use bitcoin::consensus::encode::serialize_hex;
24use bitcoin::consensus::Decodable;
25use bitcoin::Address;
26use bitcoin::{OutPoint, Transaction};
27use dlc_messages::channel::{
28 AcceptChannel, CollaborativeCloseOffer, OfferChannel, Reject, RenewAccept, RenewConfirm,
29 RenewFinalize, RenewOffer, RenewRevoke, SettleAccept, SettleConfirm, SettleFinalize,
30 SettleOffer, SignChannel,
31};
32use dlc_messages::oracle_msgs::{OracleAnnouncement, OracleAttestation};
33use dlc_messages::{AcceptDlc, Message as DlcMessage, OfferDlc, SignDlc};
34use futures::stream;
35use futures::stream::FuturesUnordered;
36use futures::{StreamExt, TryStreamExt};
37use hex::DisplayHex;
38use lightning::chain::chaininterface::FeeEstimator;
39use lightning::ln::chan_utils::{
40 build_commitment_secret, derive_private_key, derive_private_revocation_key,
41};
42use secp256k1_zkp::XOnlyPublicKey;
43use secp256k1_zkp::{
44 ecdsa::Signature, All, EcdsaAdaptorSignature, PublicKey, Secp256k1, SecretKey,
45};
46use std::collections::HashMap;
47use std::ops::Deref;
48use std::string::ToString;
49use std::sync::Arc;
50use tokio::sync::Mutex;
51
52pub const NB_CONFIRMATIONS: u32 = 6;
54pub const REFUND_DELAY: u32 = 86400 * 7;
56pub const CET_NSEQUENCE: u32 = 288;
58pub const PEER_TIMEOUT: u64 = 3600;
61
62type ClosableContractInfo<'a> = Option<(
63 &'a ContractInfo,
64 &'a AdaptorInfo,
65 Vec<(usize, OracleAttestation)>,
66)>;
67
68pub struct Manager<
70 W: Deref,
71 SP: Deref,
72 B: Deref,
73 S: Deref,
74 O: Deref,
75 T: Deref,
76 F: Deref,
77 X: ContractSigner,
78> where
79 W::Target: Wallet,
80 SP::Target: ContractSignerProvider<Signer = X>,
81 B::Target: Blockchain,
82 S::Target: Storage,
83 O::Target: Oracle,
84 T::Target: Time,
85 F::Target: FeeEstimator,
86{
87 oracles: HashMap<XOnlyPublicKey, O>,
88 wallet: W,
89 signer_provider: SP,
90 blockchain: B,
91 store: S,
92 secp: Secp256k1<All>,
93 chain_monitor: Mutex<ChainMonitor>,
94 time: T,
95 fee_estimator: F,
96}
97
98macro_rules! get_contract_in_state {
99 ($manager: ident, $contract_id: expr, $state: ident, $peer_id: expr) => {{
100 get_object_in_state!(
101 $manager,
102 $contract_id,
103 $state,
104 $peer_id,
105 Contract,
106 get_contract
107 )
108 }};
109}
110
111macro_rules! get_channel_in_state {
112 ($manager: ident, $channel_id: expr, $state: ident, $peer_id: expr) => {{
113 get_object_in_state!(
114 $manager,
115 $channel_id,
116 $state,
117 $peer_id,
118 Channel,
119 get_channel
120 )
121 }};
122}
123
124macro_rules! get_signed_channel_rollback_state {
125 ($signed_channel: ident, $state: ident, $($field: ident),*) => {{
126 match $signed_channel.roll_back_state.as_ref() {
127 Some(SignedChannelState::$state{$($field,)* ..}) => Ok(($($field,)*)),
128 _ => Err(Error::InvalidState(format!("Expected rollback state {} got {:?}", stringify!($state), $signed_channel.state))),
129 }
130 }};
131}
132
133macro_rules! check_for_timed_out_channels {
134 ($manager: ident, $state: ident) => {
135 let channels = $manager
136 .store
137 .get_signed_channels(Some(SignedChannelStateType::$state))
138 .await?;
139
140 for channel in channels {
141 if let SignedChannelState::$state { timeout, .. } = channel.state {
142 let is_timed_out = timeout < $manager.time.unix_time_now();
143 if is_timed_out {
144 match $manager.force_close_channel_internal(channel, true).await {
145 Err(e) => tracing::error!("Error force closing channel {}", e),
146 _ => {}
147 }
148 }
149 }
150 }
151 };
152}
153
154impl<W: Deref, SP: Deref, B: Deref, S: Deref, O: Deref, T: Deref, F: Deref, X: ContractSigner>
155 Manager<W, Arc<CachedContractSignerProvider<SP, X>>, B, S, O, T, F, X>
156where
157 W::Target: Wallet,
158 SP::Target: ContractSignerProvider<Signer = X>,
159 B::Target: Blockchain,
160 S::Target: Storage,
161 O::Target: Oracle,
162 T::Target: Time,
163 F::Target: FeeEstimator,
164{
165 pub async fn new(
167 wallet: W,
168 signer_provider: SP,
169 blockchain: B,
170 store: S,
171 oracles: HashMap<XOnlyPublicKey, O>,
172 time: T,
173 fee_estimator: F,
174 ) -> Result<Self, Error> {
175 let init_height = blockchain.get_blockchain_height().await?;
176 let chain_monitor = Mutex::new(
177 store
178 .get_chain_monitor()
179 .await?
180 .unwrap_or(ChainMonitor::new(init_height)),
181 );
182
183 let signer_provider = Arc::new(CachedContractSignerProvider::new(signer_provider));
184
185 Ok(Manager {
186 secp: secp256k1_zkp::Secp256k1::new(),
187 wallet,
188 signer_provider,
189 blockchain,
190 store,
191 oracles,
192 time,
193 fee_estimator,
194 chain_monitor,
195 })
196 }
197
198 pub fn get_store(&self) -> &S {
200 &self.store
201 }
202
203 pub async fn on_dlc_message(
205 &self,
206 msg: &DlcMessage,
207 counter_party: PublicKey,
208 ) -> Result<Option<DlcMessage>, Error> {
209 match msg {
210 DlcMessage::Offer(o) => {
211 self.on_offer_message(o, counter_party).await?;
212 Ok(None)
213 }
214 DlcMessage::Accept(a) => Ok(Some(self.on_accept_message(a, &counter_party).await?)),
215 DlcMessage::Sign(s) => {
216 self.on_sign_message(s, &counter_party).await?;
217 Ok(None)
218 }
219 DlcMessage::OfferChannel(o) => {
220 self.on_offer_channel(o, counter_party).await?;
221 Ok(None)
222 }
223 DlcMessage::AcceptChannel(a) => Ok(Some(DlcMessage::SignChannel(
224 self.on_accept_channel(a, &counter_party).await?,
225 ))),
226 DlcMessage::SignChannel(s) => {
227 self.on_sign_channel(s, &counter_party).await?;
228 Ok(None)
229 }
230 DlcMessage::SettleOffer(s) => match self.on_settle_offer(s, &counter_party).await? {
231 Some(msg) => Ok(Some(DlcMessage::Reject(msg))),
232 None => Ok(None),
233 },
234 DlcMessage::SettleAccept(s) => Ok(Some(DlcMessage::SettleConfirm(
235 self.on_settle_accept(s, &counter_party).await?,
236 ))),
237 DlcMessage::SettleConfirm(s) => Ok(Some(DlcMessage::SettleFinalize(
238 self.on_settle_confirm(s, &counter_party).await?,
239 ))),
240 DlcMessage::SettleFinalize(s) => {
241 self.on_settle_finalize(s, &counter_party).await?;
242 Ok(None)
243 }
244 DlcMessage::RenewOffer(r) => match self.on_renew_offer(r, &counter_party).await? {
245 Some(msg) => Ok(Some(DlcMessage::Reject(msg))),
246 None => Ok(None),
247 },
248 DlcMessage::RenewAccept(r) => Ok(Some(DlcMessage::RenewConfirm(
249 self.on_renew_accept(r, &counter_party).await?,
250 ))),
251 DlcMessage::RenewConfirm(r) => Ok(Some(DlcMessage::RenewFinalize(
252 self.on_renew_confirm(r, &counter_party).await?,
253 ))),
254 DlcMessage::RenewFinalize(r) => {
255 let revoke = self.on_renew_finalize(r, &counter_party).await?;
256 Ok(Some(DlcMessage::RenewRevoke(revoke)))
257 }
258 DlcMessage::RenewRevoke(r) => {
259 self.on_renew_revoke(r, &counter_party).await?;
260 Ok(None)
261 }
262 DlcMessage::CollaborativeCloseOffer(c) => {
263 self.on_collaborative_close_offer(c, &counter_party).await?;
264 Ok(None)
265 }
266 DlcMessage::Reject(r) => {
267 self.on_reject(r, &counter_party).await?;
268 Ok(None)
269 }
270 }
271 }
272
273 pub async fn send_offer(
278 &self,
279 contract_input: &ContractInput,
280 counter_party: PublicKey,
281 ) -> Result<OfferDlc, Error> {
282 let oracle_announcements = self.oracle_announcements(contract_input).await?;
284
285 self.send_offer_with_announcements(contract_input, counter_party, oracle_announcements)
286 .await
287 }
288
289 pub async fn send_offer_with_announcements(
295 &self,
296 contract_input: &ContractInput,
297 counter_party: PublicKey,
298 oracle_announcements: Vec<Vec<OracleAnnouncement>>,
299 ) -> Result<OfferDlc, Error> {
300 let (offered_contract, offer_msg) = crate::contract_updater::offer_contract(
301 &self.secp,
302 contract_input,
303 oracle_announcements,
304 REFUND_DELAY,
305 &counter_party,
306 &self.wallet,
307 &self.blockchain,
308 &self.time,
309 &self.signer_provider,
310 )
311 .await?;
312
313 offered_contract.validate()?;
314
315 self.store.create_contract(&offered_contract).await?;
316
317 Ok(offer_msg)
318 }
319
320 pub async fn accept_contract_offer(
322 &self,
323 contract_id: &ContractId,
324 ) -> Result<(ContractId, PublicKey, AcceptDlc), Error> {
325 let offered_contract =
326 get_contract_in_state!(self, contract_id, Offered, None as Option<PublicKey>)?;
327
328 let counter_party = offered_contract.counter_party;
329
330 let (accepted_contract, accept_msg) = accept_contract(
331 &self.secp,
332 &offered_contract,
333 &self.wallet,
334 &self.signer_provider,
335 &self.blockchain,
336 )
337 .await?;
338
339 self.wallet.import_address(&Address::p2wsh(
340 &accepted_contract.dlc_transactions.funding_script_pubkey,
341 self.blockchain.get_network()?,
342 ))?;
343
344 let contract_id = accepted_contract.get_contract_id();
345
346 self.store
347 .update_contract(&Contract::Accepted(accepted_contract))
348 .await?;
349
350 Ok((contract_id, counter_party, accept_msg))
351 }
352
353 pub async fn periodic_chain_monitor(&self) -> Result<(), Error> {
359 let cur_height = self.blockchain.get_blockchain_height().await?;
360 let last_height = self.chain_monitor.lock().await.last_height;
361
362 if cur_height < last_height {
365 return Err(Error::InvalidState(
366 "Current height is lower than last height.".to_string(),
367 ));
368 }
369
370 for height in last_height + 1..=cur_height {
371 let block = self.blockchain.get_block_at_height(height).await?;
372
373 self.chain_monitor
374 .lock()
375 .await
376 .process_block(&block, height);
377 }
378
379 Ok(())
380 }
381
382 pub async fn periodic_check(&self, check_channels: bool) -> Result<(), Error> {
385 tracing::debug!("Periodic check.");
386 self.check_signed_contracts().await?;
387 self.check_confirmed_contracts().await?;
388 self.check_preclosed_contracts().await?;
389
390 if check_channels {
391 self.channel_checks().await?;
392 }
393
394 Ok(())
395 }
396
397 async fn on_offer_message(
398 &self,
399 offered_message: &OfferDlc,
400 counter_party: PublicKey,
401 ) -> Result<(), Error> {
402 offered_message.validate(&self.secp, REFUND_DELAY, REFUND_DELAY * 2)?;
403 let keys_id = self
404 .signer_provider
405 .derive_signer_key_id(false, offered_message.temporary_contract_id);
406 let contract: OfferedContract =
407 OfferedContract::try_from_offer_dlc(offered_message, counter_party, keys_id)?;
408 contract.validate()?;
409
410 if self.store.get_contract(&contract.id).await?.is_some() {
411 return Err(Error::InvalidParameters(
412 "Contract with identical id already exists".to_string(),
413 ));
414 }
415
416 self.store.create_contract(&contract).await?;
417
418 Ok(())
419 }
420
421 async fn on_accept_message(
422 &self,
423 accept_msg: &AcceptDlc,
424 counter_party: &PublicKey,
425 ) -> Result<DlcMessage, Error> {
426 let offered_contract = get_contract_in_state!(
427 self,
428 &accept_msg.temporary_contract_id,
429 Offered,
430 Some(*counter_party)
431 )?;
432
433 let (signed_contract, signed_msg) = match verify_accepted_and_sign_contract(
434 &self.secp,
435 &offered_contract,
436 accept_msg,
437 &self.wallet,
438 &self.signer_provider,
439 ) {
440 Ok(contract) => contract,
441 Err(e) => {
442 return self
443 .accept_fail_on_error(offered_contract, accept_msg.clone(), e)
444 .await
445 }
446 };
447
448 self.wallet.import_address(&Address::p2wsh(
449 &signed_contract
450 .accepted_contract
451 .dlc_transactions
452 .funding_script_pubkey,
453 self.blockchain.get_network()?,
454 ))?;
455
456 self.store
457 .update_contract(&Contract::Signed(signed_contract))
458 .await?;
459
460 Ok(DlcMessage::Sign(signed_msg))
461 }
462
463 async fn on_sign_message(
464 &self,
465 sign_message: &SignDlc,
466 peer_id: &PublicKey,
467 ) -> Result<(), Error> {
468 let accepted_contract =
469 get_contract_in_state!(self, &sign_message.contract_id, Accepted, Some(*peer_id))?;
470
471 let (signed_contract, fund_tx) = match crate::contract_updater::verify_signed_contract(
472 &self.secp,
473 &accepted_contract,
474 sign_message,
475 &self.wallet,
476 ) {
477 Ok(contract) => contract,
478 Err(e) => {
479 return self
480 .sign_fail_on_error(accepted_contract, sign_message.clone(), e)
481 .await
482 }
483 };
484
485 self.store
486 .update_contract(&Contract::Signed(signed_contract))
487 .await?;
488
489 self.blockchain.send_transaction(&fund_tx).await?;
490
491 Ok(())
492 }
493
494 async fn get_oracle_announcements(
495 &self,
496 oracle_inputs: &OracleInput,
497 ) -> Result<Vec<OracleAnnouncement>, Error> {
498 let mut announcements = Vec::new();
499 for pubkey in &oracle_inputs.public_keys {
500 let oracle = self
501 .oracles
502 .get(pubkey)
503 .ok_or_else(|| Error::InvalidParameters("Unknown oracle public key".to_string()))?;
504 let announcement = oracle.get_announcement(&oracle_inputs.event_id).await?;
505 announcements.push(announcement);
506 }
507
508 Ok(announcements)
509 }
510
511 async fn sign_fail_on_error<R>(
512 &self,
513 accepted_contract: AcceptedContract,
514 sign_message: SignDlc,
515 e: Error,
516 ) -> Result<R, Error> {
517 tracing::error!("Error in on_sign {}", e);
518 self.store
519 .update_contract(&Contract::FailedSign(FailedSignContract {
520 accepted_contract,
521 sign_message,
522 error_message: e.to_string(),
523 }))
524 .await?;
525 Err(e)
526 }
527
528 async fn accept_fail_on_error<R>(
529 &self,
530 offered_contract: OfferedContract,
531 accept_message: AcceptDlc,
532 e: Error,
533 ) -> Result<R, Error> {
534 tracing::error!("Error in on_accept {}", e);
535 self.store
536 .update_contract(&Contract::FailedAccept(FailedAcceptContract {
537 offered_contract,
538 accept_message,
539 error_message: e.to_string(),
540 }))
541 .await?;
542 Err(e)
543 }
544
545 async fn check_signed_contract(&self, contract: &SignedContract) -> Result<(), Error> {
546 let confirmations = self
547 .blockchain
548 .get_transaction_confirmations(
549 &contract
550 .accepted_contract
551 .dlc_transactions
552 .fund
553 .compute_txid(),
554 )
555 .await?;
556 if confirmations >= NB_CONFIRMATIONS {
557 tracing::info!(
558 confirmations,
559 contract_id = contract.accepted_contract.get_contract_id_string(),
560 "Marking contract as confirmed."
561 );
562 self.store
563 .update_contract(&Contract::Confirmed(contract.clone()))
564 .await?;
565 } else {
566 tracing::info!(
567 confirmations,
568 required = NB_CONFIRMATIONS,
569 contract_id = contract.accepted_contract.get_contract_id_string(),
570 "Not enough confirmations to mark contract as confirmed."
571 );
572 }
573 Ok(())
574 }
575
576 async fn check_signed_contracts(&self) -> Result<(), Error> {
577 for c in self.store.get_signed_contracts().await? {
578 if let Err(e) = self.check_signed_contract(&c).await {
579 tracing::error!(
580 "Error checking confirmed contract {}: {}",
581 c.accepted_contract.get_contract_id_string(),
582 e
583 )
584 }
585 }
586
587 Ok(())
588 }
589
590 async fn check_confirmed_contracts(&self) -> Result<(), Error> {
591 for c in self.store.get_confirmed_contracts().await? {
592 if c.channel_id.is_some() {
594 continue;
595 }
596 if let Err(e) = self.check_confirmed_contract(&c).await {
597 tracing::error!(
598 "Error checking confirmed contract {}: {}",
599 c.accepted_contract.get_contract_id_string(),
600 e
601 )
602 }
603 }
604
605 Ok(())
606 }
607
608 async fn get_closable_contract_info<'a>(
609 &'a self,
610 contract: &'a SignedContract,
611 ) -> ClosableContractInfo<'a> {
612 let contract_infos = &contract.accepted_contract.offered_contract.contract_info;
613 let adaptor_infos = &contract.accepted_contract.adaptor_infos;
614 for (contract_info, adaptor_info) in contract_infos.iter().zip(adaptor_infos.iter()) {
615 let matured: Vec<_> = contract_info
616 .oracle_announcements
617 .iter()
618 .filter(|x| {
619 (x.oracle_event.event_maturity_epoch as u64) <= self.time.unix_time_now()
620 })
621 .enumerate()
622 .collect();
623 if matured.len() >= contract_info.threshold {
624 let attestations = stream::iter(matured.iter())
625 .map(|(i, announcement)| async move {
626 let oracle = match self.oracles.get(&announcement.oracle_public_key) {
628 Some(oracle) => oracle,
629 None => {
630 tracing::debug!(
631 "Oracle not found for key: {}",
632 announcement.oracle_public_key
633 );
634 return None;
635 }
636 };
637
638 let attestation = match oracle
640 .get_attestation(&announcement.oracle_event.event_id)
641 .await
642 {
643 Ok(attestation) => attestation,
644 Err(e) => {
645 tracing::error!(
646 "Attestation not found for event. id={} error={}",
647 announcement.oracle_event.event_id,
648 e.to_string()
649 );
650 return None;
651 }
652 };
653
654 if let Err(e) = attestation.validate(&self.secp, announcement) {
656 tracing::error!(
657 "Oracle attestation is not valid. pubkey={} event_id={}, error={:?}",
658 announcement.oracle_public_key,
659 announcement.oracle_event.event_id,
660 e
661 );
662 return None;
663 }
664
665 Some((*i, attestation))
666 })
667 .collect::<FuturesUnordered<_>>()
668 .await
669 .filter_map(|result| async move { result }) .collect::<Vec<_>>()
671 .await;
672 if attestations.len() >= contract_info.threshold {
673 return Some((contract_info, adaptor_info, attestations));
674 }
675 }
676 }
677 None
678 }
679
680 async fn check_confirmed_contract(&self, contract: &SignedContract) -> Result<(), Error> {
681 let closable_contract_info = self.get_closable_contract_info(contract).await;
682 if let Some((contract_info, adaptor_info, attestations)) = closable_contract_info {
683 let offer = &contract.accepted_contract.offered_contract;
684 let signer = self.signer_provider.derive_contract_signer(offer.keys_id)?;
685
686 if let Ok(cet) = crate::contract_updater::get_signed_cet(
692 &self.secp,
693 contract,
694 contract_info,
695 adaptor_info,
696 &attestations,
697 &signer,
698 ) {
699 match self
700 .close_contract(
701 contract,
702 cet,
703 attestations.iter().map(|x| x.1.clone()).collect(),
704 )
705 .await
706 {
707 Ok(closed_contract) => {
708 self.store.update_contract(&closed_contract).await?;
709 return Ok(());
710 }
711 Err(e) => {
712 tracing::warn!(
713 "Failed to close contract {}: {}",
714 contract.accepted_contract.get_contract_id_string(),
715 e
716 );
717 return Err(e);
718 }
719 }
720 }
721 }
722
723 self.check_refund(contract).await?;
724
725 Ok(())
726 }
727
728 pub async fn close_confirmed_contract(
730 &self,
731 contract_id: &ContractId,
732 attestations: Vec<(usize, OracleAttestation)>,
733 ) -> Result<Contract, Error> {
734 let contract = get_contract_in_state!(self, contract_id, Confirmed, None::<PublicKey>)?;
735 let contract_infos = &contract.accepted_contract.offered_contract.contract_info;
736 let adaptor_infos = &contract.accepted_contract.adaptor_infos;
737
738 if let Some((contract_info, adaptor_info)) =
740 contract_infos.iter().zip(adaptor_infos).find(|(c, _)| {
741 let matches = attestations
742 .iter()
743 .filter(|(i, a)| {
744 c.oracle_announcements[*i].oracle_event.oracle_nonces == a.nonces()
745 })
746 .count();
747
748 matches >= c.threshold
749 })
750 {
751 let offer = &contract.accepted_contract.offered_contract;
752 let signer = self.signer_provider.derive_contract_signer(offer.keys_id)?;
753 let cet = crate::contract_updater::get_signed_cet(
754 &self.secp,
755 &contract,
756 contract_info,
757 adaptor_info,
758 &attestations,
759 &signer,
760 )?;
761
762 let time = bitcoin::absolute::Time::from_consensus(self.time.unix_time_now() as u32)
764 .expect("Time is not in valid range. This should never happen.");
765 let height =
766 Height::from_consensus(self.blockchain.get_blockchain_height().await? as u32)
767 .expect("Height is not in valid range. This should never happen.");
768 let locktime = cet.lock_time;
769
770 if !locktime.is_satisfied_by(height, time) {
771 return Err(Error::InvalidState(
772 "CET lock time has not passed yet".to_string(),
773 ));
774 }
775
776 match self
777 .close_contract(
778 &contract,
779 cet,
780 attestations.into_iter().map(|x| x.1).collect(),
781 )
782 .await
783 {
784 Ok(closed_contract) => {
785 self.store.update_contract(&closed_contract).await?;
786 Ok(closed_contract)
787 }
788 Err(e) => {
789 tracing::warn!(
790 "Failed to close contract {}: {e}",
791 contract.accepted_contract.get_contract_id_string()
792 );
793 Err(e)
794 }
795 }
796 } else {
797 Err(Error::InvalidState(
798 "Attestations did not match contract infos".to_string(),
799 ))
800 }
801 }
802
803 async fn check_preclosed_contracts(&self) -> Result<(), Error> {
804 for c in self.store.get_preclosed_contracts().await? {
805 if let Err(e) = self.check_preclosed_contract(&c).await {
806 tracing::error!(
807 "Error checking pre-closed contract {}: {}",
808 c.signed_contract.accepted_contract.get_contract_id_string(),
809 e
810 )
811 }
812 }
813
814 Ok(())
815 }
816
817 async fn check_preclosed_contract(&self, contract: &PreClosedContract) -> Result<(), Error> {
818 let broadcasted_txid = contract.signed_cet.compute_txid();
819 let confirmations = self
820 .blockchain
821 .get_transaction_confirmations(&broadcasted_txid)
822 .await?;
823 if confirmations >= NB_CONFIRMATIONS {
824 let closed_contract = ClosedContract {
825 attestations: contract.attestations.clone(),
826 signed_cet: Some(contract.signed_cet.clone()),
827 contract_id: contract.signed_contract.accepted_contract.get_contract_id(),
828 temporary_contract_id: contract
829 .signed_contract
830 .accepted_contract
831 .offered_contract
832 .id,
833 counter_party_id: contract
834 .signed_contract
835 .accepted_contract
836 .offered_contract
837 .counter_party,
838 pnl: contract
839 .signed_contract
840 .accepted_contract
841 .compute_pnl(&contract.signed_cet),
842 };
843 self.store
844 .update_contract(&Contract::Closed(closed_contract))
845 .await?;
846 }
847
848 Ok(())
849 }
850
851 async fn close_contract(
852 &self,
853 contract: &SignedContract,
854 signed_cet: Transaction,
855 attestations: Vec<OracleAttestation>,
856 ) -> Result<Contract, Error> {
857 let confirmations = self
858 .blockchain
859 .get_transaction_confirmations(&signed_cet.compute_txid())
860 .await?;
861
862 if confirmations < 1 {
863 tracing::info!(
864 txid = signed_cet.compute_txid().to_string(),
865 "Broadcasting signed CET."
866 );
867 self.blockchain.send_transaction(&signed_cet).await?;
872
873 let preclosed_contract = PreClosedContract {
874 signed_contract: contract.clone(),
875 attestations: Some(attestations),
876 signed_cet,
877 };
878
879 return Ok(Contract::PreClosed(preclosed_contract));
880 } else if confirmations < NB_CONFIRMATIONS {
881 let preclosed_contract = PreClosedContract {
882 signed_contract: contract.clone(),
883 attestations: Some(attestations),
884 signed_cet,
885 };
886
887 return Ok(Contract::PreClosed(preclosed_contract));
888 }
889
890 let closed_contract = ClosedContract {
891 attestations: Some(attestations.to_vec()),
892 pnl: contract.accepted_contract.compute_pnl(&signed_cet),
893 signed_cet: Some(signed_cet),
894 contract_id: contract.accepted_contract.get_contract_id(),
895 temporary_contract_id: contract.accepted_contract.offered_contract.id,
896 counter_party_id: contract.accepted_contract.offered_contract.counter_party,
897 };
898
899 Ok(Contract::Closed(closed_contract))
900 }
901
902 async fn check_refund(&self, contract: &SignedContract) -> Result<(), Error> {
903 if contract
905 .accepted_contract
906 .dlc_transactions
907 .refund
908 .lock_time
909 .to_consensus_u32() as u64
910 <= self.time.unix_time_now()
911 {
912 let accepted_contract = &contract.accepted_contract;
913 let refund = accepted_contract.dlc_transactions.refund.clone();
914 let confirmations = self
915 .blockchain
916 .get_transaction_confirmations(&refund.compute_txid())
917 .await?;
918 if confirmations == 0 {
919 let offer = &contract.accepted_contract.offered_contract;
920 let signer = self.signer_provider.derive_contract_signer(offer.keys_id)?;
921 let refund =
922 crate::contract_updater::get_signed_refund(&self.secp, contract, &signer)?;
923 self.blockchain.send_transaction(&refund).await?;
924 }
925
926 self.store
927 .update_contract(&Contract::Refunded(contract.clone()))
928 .await?;
929 }
930
931 Ok(())
932 }
933
934 pub async fn on_counterparty_close(
937 &mut self,
938 contract: &SignedContract,
939 closing_tx: Transaction,
940 confirmations: u32,
941 ) -> Result<Contract, Error> {
942 if !closing_tx.input.iter().any(|i| {
944 i.previous_output
945 == contract
946 .accepted_contract
947 .dlc_transactions
948 .get_fund_outpoint()
949 }) {
950 return Err(Error::InvalidParameters(
951 "Closing tx does not spend the funding tx".to_string(),
952 ));
953 }
954
955 if contract
957 .accepted_contract
958 .dlc_transactions
959 .refund
960 .compute_txid()
961 == closing_tx.compute_txid()
962 {
963 let refunded = Contract::Refunded(contract.clone());
964 self.store.update_contract(&refunded).await?;
965 return Ok(refunded);
966 }
967
968 let contract = if confirmations < NB_CONFIRMATIONS {
969 Contract::PreClosed(PreClosedContract {
970 signed_contract: contract.clone(),
971 attestations: None, signed_cet: closing_tx,
973 })
974 } else {
975 Contract::Closed(ClosedContract {
976 attestations: None, pnl: contract.accepted_contract.compute_pnl(&closing_tx),
978 signed_cet: Some(closing_tx),
979 contract_id: contract.accepted_contract.get_contract_id(),
980 temporary_contract_id: contract.accepted_contract.offered_contract.id,
981 counter_party_id: contract.accepted_contract.offered_contract.counter_party,
982 })
983 };
984
985 self.store.update_contract(&contract).await?;
986
987 Ok(contract)
988 }
989}
990
991impl<W: Deref, SP: Deref, B: Deref, S: Deref, O: Deref, T: Deref, F: Deref, X: ContractSigner>
992 Manager<W, Arc<CachedContractSignerProvider<SP, X>>, B, S, O, T, F, X>
993where
994 W::Target: Wallet,
995 SP::Target: ContractSignerProvider<Signer = X>,
996 B::Target: Blockchain,
997 S::Target: Storage,
998 O::Target: Oracle,
999 T::Target: Time,
1000 F::Target: FeeEstimator,
1001{
1002 pub async fn offer_channel(
1005 &self,
1006 contract_input: &ContractInput,
1007 counter_party: PublicKey,
1008 ) -> Result<OfferChannel, Error> {
1009 let oracle_announcements = self.oracle_announcements(contract_input).await?;
1010
1011 let (offered_channel, offered_contract) = crate::channel_updater::offer_channel(
1012 &self.secp,
1013 contract_input,
1014 &counter_party,
1015 &oracle_announcements,
1016 CET_NSEQUENCE,
1017 REFUND_DELAY,
1018 &self.wallet,
1019 &self.signer_provider,
1020 &self.blockchain,
1021 &self.time,
1022 crate::utils::get_new_temporary_id(),
1023 )
1024 .await?;
1025
1026 let msg = offered_channel.get_offer_channel_msg(&offered_contract);
1027
1028 self.store
1029 .upsert_channel(
1030 Channel::Offered(offered_channel),
1031 Some(Contract::Offered(offered_contract)),
1032 )
1033 .await?;
1034
1035 Ok(msg)
1036 }
1037
1038 pub async fn reject_channel(
1041 &self,
1042 channel_id: &ChannelId,
1043 ) -> Result<(Reject, PublicKey), Error> {
1044 let offered_channel =
1045 get_channel_in_state!(self, channel_id, Offered, None as Option<PublicKey>)?;
1046
1047 if offered_channel.is_offer_party {
1048 return Err(Error::InvalidState(
1049 "Cannot reject channel initiated by us.".to_string(),
1050 ));
1051 }
1052
1053 let offered_contract = get_contract_in_state!(
1054 self,
1055 &offered_channel.offered_contract_id,
1056 Offered,
1057 None as Option<PublicKey>
1058 )?;
1059
1060 let counterparty = offered_channel.counter_party;
1061 self.store
1062 .upsert_channel(
1063 Channel::Cancelled(offered_channel),
1064 Some(Contract::Rejected(offered_contract)),
1065 )
1066 .await?;
1067
1068 let msg = Reject {
1069 channel_id: *channel_id,
1070 };
1071 Ok((msg, counterparty))
1072 }
1073
1074 pub async fn accept_channel(
1078 &self,
1079 channel_id: &ChannelId,
1080 ) -> Result<(AcceptChannel, ChannelId, ContractId, PublicKey), Error> {
1081 let offered_channel =
1082 get_channel_in_state!(self, channel_id, Offered, None as Option<PublicKey>)?;
1083
1084 if offered_channel.is_offer_party {
1085 return Err(Error::InvalidState(
1086 "Cannot accept channel initiated by us.".to_string(),
1087 ));
1088 }
1089
1090 let offered_contract = get_contract_in_state!(
1091 self,
1092 &offered_channel.offered_contract_id,
1093 Offered,
1094 None as Option<PublicKey>
1095 )?;
1096
1097 let (accepted_channel, accepted_contract, accept_channel) =
1098 crate::channel_updater::accept_channel_offer(
1099 &self.secp,
1100 &offered_channel,
1101 &offered_contract,
1102 &self.wallet,
1103 &self.signer_provider,
1104 &self.blockchain,
1105 )
1106 .await?;
1107
1108 self.wallet.import_address(&Address::p2wsh(
1109 &accepted_contract.dlc_transactions.funding_script_pubkey,
1110 self.blockchain.get_network()?,
1111 ))?;
1112
1113 let channel_id = accepted_channel.channel_id;
1114 let contract_id = accepted_contract.get_contract_id();
1115 let counter_party = accepted_contract.offered_contract.counter_party;
1116
1117 self.store
1118 .upsert_channel(
1119 Channel::Accepted(accepted_channel),
1120 Some(Contract::Accepted(accepted_contract)),
1121 )
1122 .await?;
1123
1124 Ok((accept_channel, channel_id, contract_id, counter_party))
1125 }
1126
1127 pub async fn force_close_channel(&self, channel_id: &ChannelId) -> Result<(), Error> {
1129 let channel = get_channel_in_state!(self, channel_id, Signed, None as Option<PublicKey>)?;
1130
1131 self.force_close_channel_internal(channel, true).await
1132 }
1133
1134 pub async fn settle_offer(
1138 &self,
1139 channel_id: &ChannelId,
1140 counter_payout: u64,
1141 ) -> Result<(SettleOffer, PublicKey), Error> {
1142 let mut signed_channel =
1143 get_channel_in_state!(self, channel_id, Signed, None as Option<PublicKey>)?;
1144
1145 let msg = crate::channel_updater::settle_channel_offer(
1146 &self.secp,
1147 &mut signed_channel,
1148 counter_payout,
1149 PEER_TIMEOUT,
1150 &self.signer_provider,
1151 &self.time,
1152 )?;
1153
1154 let counter_party = signed_channel.counter_party;
1155
1156 self.store
1157 .upsert_channel(Channel::Signed(signed_channel), None)
1158 .await?;
1159
1160 Ok((msg, counter_party))
1161 }
1162
1163 pub async fn accept_settle_offer(
1166 &self,
1167 channel_id: &ChannelId,
1168 ) -> Result<(SettleAccept, PublicKey), Error> {
1169 let mut signed_channel =
1170 get_channel_in_state!(self, channel_id, Signed, None as Option<PublicKey>)?;
1171
1172 let msg = crate::channel_updater::settle_channel_accept(
1173 &self.secp,
1174 &mut signed_channel,
1175 CET_NSEQUENCE,
1176 0,
1177 PEER_TIMEOUT,
1178 &self.signer_provider,
1179 &self.time,
1180 &self.chain_monitor,
1181 )
1182 .await?;
1183
1184 let counter_party = signed_channel.counter_party;
1185
1186 self.store
1187 .upsert_channel(Channel::Signed(signed_channel), None)
1188 .await?;
1189
1190 Ok((msg, counter_party))
1191 }
1192
1193 pub async fn renew_offer(
1197 &self,
1198 channel_id: &ChannelId,
1199 counter_payout: u64,
1200 contract_input: &ContractInput,
1201 ) -> Result<(RenewOffer, PublicKey), Error> {
1202 let mut signed_channel =
1203 get_channel_in_state!(self, channel_id, Signed, None as Option<PublicKey>)?;
1204
1205 let oracle_announcements = self.oracle_announcements(contract_input).await?;
1206
1207 let (msg, offered_contract) = crate::channel_updater::renew_offer(
1208 &self.secp,
1209 &mut signed_channel,
1210 contract_input,
1211 oracle_announcements,
1212 counter_payout,
1213 REFUND_DELAY,
1214 PEER_TIMEOUT,
1215 CET_NSEQUENCE,
1216 &self.signer_provider,
1217 &self.time,
1218 )?;
1219
1220 let counter_party = offered_contract.counter_party;
1221
1222 self.store
1223 .upsert_channel(
1224 Channel::Signed(signed_channel),
1225 Some(Contract::Offered(offered_contract)),
1226 )
1227 .await?;
1228
1229 Ok((msg, counter_party))
1230 }
1231
1232 pub async fn accept_renew_offer(
1236 &self,
1237 channel_id: &ChannelId,
1238 ) -> Result<(RenewAccept, PublicKey), Error> {
1239 let mut signed_channel =
1240 get_channel_in_state!(self, channel_id, Signed, None as Option<PublicKey>)?;
1241 let offered_contract_id = signed_channel.get_contract_id().ok_or_else(|| {
1242 Error::InvalidState("Expected to have a contract id but did not.".to_string())
1243 })?;
1244
1245 let offered_contract = get_contract_in_state!(
1246 self,
1247 &offered_contract_id,
1248 Offered,
1249 None as Option<PublicKey>
1250 )?;
1251
1252 let (accepted_contract, msg) = crate::channel_updater::accept_channel_renewal(
1253 &self.secp,
1254 &mut signed_channel,
1255 &offered_contract,
1256 CET_NSEQUENCE,
1257 PEER_TIMEOUT,
1258 &self.signer_provider,
1259 &self.time,
1260 )?;
1261
1262 let counter_party = signed_channel.counter_party;
1263
1264 self.store
1265 .upsert_channel(
1266 Channel::Signed(signed_channel),
1267 Some(Contract::Accepted(accepted_contract)),
1268 )
1269 .await?;
1270
1271 Ok((msg, counter_party))
1272 }
1273
1274 pub async fn reject_renew_offer(
1278 &self,
1279 channel_id: &ChannelId,
1280 ) -> Result<(Reject, PublicKey), Error> {
1281 let mut signed_channel =
1282 get_channel_in_state!(self, channel_id, Signed, None as Option<PublicKey>)?;
1283 let offered_contract_id = signed_channel.get_contract_id().ok_or_else(|| {
1284 Error::InvalidState(
1285 "Expected to be in a state with an associated contract id but was not.".to_string(),
1286 )
1287 })?;
1288
1289 let offered_contract = get_contract_in_state!(
1290 self,
1291 &offered_contract_id,
1292 Offered,
1293 None as Option<PublicKey>
1294 )?;
1295
1296 let reject_msg = crate::channel_updater::reject_renew_offer(&mut signed_channel)?;
1297
1298 let counter_party = signed_channel.counter_party;
1299
1300 self.store
1301 .upsert_channel(
1302 Channel::Signed(signed_channel),
1303 Some(Contract::Rejected(offered_contract)),
1304 )
1305 .await?;
1306
1307 Ok((reject_msg, counter_party))
1308 }
1309
1310 pub async fn reject_settle_offer(
1314 &self,
1315 channel_id: &ChannelId,
1316 ) -> Result<(Reject, PublicKey), Error> {
1317 let mut signed_channel =
1318 get_channel_in_state!(self, channel_id, Signed, None as Option<PublicKey>)?;
1319
1320 let msg = crate::channel_updater::reject_settle_offer(&mut signed_channel)?;
1321
1322 let counter_party = signed_channel.counter_party;
1323
1324 self.store
1325 .upsert_channel(Channel::Signed(signed_channel), None)
1326 .await?;
1327
1328 Ok((msg, counter_party))
1329 }
1330
1331 pub async fn offer_collaborative_close(
1336 &self,
1337 channel_id: &ChannelId,
1338 counter_payout: u64,
1339 ) -> Result<CollaborativeCloseOffer, Error> {
1340 let mut signed_channel =
1341 get_channel_in_state!(self, channel_id, Signed, None as Option<PublicKey>)?;
1342
1343 let (msg, close_tx) = crate::channel_updater::offer_collaborative_close(
1344 &self.secp,
1345 &mut signed_channel,
1346 counter_payout,
1347 &self.signer_provider,
1348 &self.time,
1349 )?;
1350
1351 self.chain_monitor.lock().await.add_tx(
1352 close_tx.compute_txid(),
1353 ChannelInfo {
1354 channel_id: *channel_id,
1355 tx_type: TxType::CollaborativeClose,
1356 },
1357 );
1358
1359 self.store
1360 .upsert_channel(Channel::Signed(signed_channel), None)
1361 .await?;
1362 self.store
1363 .persist_chain_monitor(&*self.chain_monitor.lock().await)
1364 .await?;
1365
1366 Ok(msg)
1367 }
1368
1369 pub async fn accept_collaborative_close(&self, channel_id: &ChannelId) -> Result<(), Error> {
1372 let signed_channel =
1373 get_channel_in_state!(self, channel_id, Signed, None as Option<PublicKey>)?;
1374
1375 let closed_contract = if let Some(SignedChannelState::Established {
1376 signed_contract_id,
1377 ..
1378 }) = &signed_channel.roll_back_state
1379 {
1380 let counter_payout = get_signed_channel_state!(
1381 signed_channel,
1382 CollaborativeCloseOffered,
1383 counter_payout
1384 )?;
1385 Some(
1386 self.get_collaboratively_closed_contract(signed_contract_id, *counter_payout, true)
1387 .await?,
1388 )
1389 } else {
1390 None
1391 };
1392
1393 let (close_tx, closed_channel) = crate::channel_updater::accept_collaborative_close_offer(
1394 &self.secp,
1395 &signed_channel,
1396 &self.signer_provider,
1397 )?;
1398
1399 self.blockchain.send_transaction(&close_tx).await?;
1400
1401 self.store.upsert_channel(closed_channel, None).await?;
1402
1403 if let Some(closed_contract) = closed_contract {
1404 self.store
1405 .update_contract(&Contract::Closed(closed_contract))
1406 .await?;
1407 }
1408
1409 Ok(())
1410 }
1411
1412 async fn try_finalize_closing_established_channel(
1413 &self,
1414 signed_channel: SignedChannel,
1415 ) -> Result<(), Error> {
1416 let (buffer_tx, contract_id, &is_initiator) = get_signed_channel_state!(
1417 signed_channel,
1418 Closing,
1419 buffer_transaction,
1420 contract_id,
1421 is_initiator
1422 )?;
1423
1424 if self
1425 .blockchain
1426 .get_transaction_confirmations(&buffer_tx.compute_txid())
1427 .await?
1428 >= CET_NSEQUENCE
1429 {
1430 tracing::info!(
1431 "Buffer transaction for contract {} has enough confirmations to spend from it",
1432 serialize_hex(&contract_id)
1433 );
1434
1435 let confirmed_contract =
1436 get_contract_in_state!(self, &contract_id, Confirmed, None as Option<PublicKey>)?;
1437
1438 let (contract_info, adaptor_info, attestations) = self
1439 .get_closable_contract_info(&confirmed_contract)
1440 .await
1441 .ok_or_else(|| {
1442 Error::InvalidState("Could not get information to close contract".to_string())
1443 })?;
1444
1445 let (signed_cet, closed_channel) =
1446 crate::channel_updater::finalize_unilateral_close_settled_channel(
1447 &self.secp,
1448 &signed_channel,
1449 &confirmed_contract,
1450 contract_info,
1451 &attestations,
1452 adaptor_info,
1453 &self.signer_provider,
1454 is_initiator,
1455 )?;
1456
1457 let closed_contract = self
1458 .close_contract(
1459 &confirmed_contract,
1460 signed_cet,
1461 attestations.iter().map(|x| &x.1).cloned().collect(),
1462 )
1463 .await?;
1464
1465 self.chain_monitor
1466 .lock()
1467 .await
1468 .cleanup_channel(signed_channel.channel_id);
1469
1470 self.store
1471 .upsert_channel(closed_channel, Some(closed_contract))
1472 .await?;
1473 }
1474
1475 Ok(())
1476 }
1477
1478 async fn on_offer_channel(
1479 &self,
1480 offer_channel: &OfferChannel,
1481 counter_party: PublicKey,
1482 ) -> Result<(), Error> {
1483 offer_channel.validate(
1484 &self.secp,
1485 REFUND_DELAY,
1486 REFUND_DELAY * 2,
1487 CET_NSEQUENCE,
1488 CET_NSEQUENCE * 2,
1489 )?;
1490
1491 let keys_id = self
1492 .signer_provider
1493 .derive_signer_key_id(false, offer_channel.temporary_contract_id);
1494 let (channel, contract) =
1495 OfferedChannel::from_offer_channel(offer_channel, counter_party, keys_id)?;
1496
1497 contract.validate()?;
1498
1499 if self
1500 .store
1501 .get_channel(&channel.temporary_channel_id)
1502 .await?
1503 .is_some()
1504 {
1505 return Err(Error::InvalidParameters(
1506 "Channel with identical idea already in store".to_string(),
1507 ));
1508 }
1509
1510 self.store
1511 .upsert_channel(Channel::Offered(channel), Some(Contract::Offered(contract)))
1512 .await?;
1513
1514 Ok(())
1515 }
1516
1517 async fn on_accept_channel(
1518 &self,
1519 accept_channel: &AcceptChannel,
1520 peer_id: &PublicKey,
1521 ) -> Result<SignChannel, Error> {
1522 let offered_channel = get_channel_in_state!(
1523 self,
1524 &accept_channel.temporary_channel_id,
1525 Offered,
1526 Some(*peer_id)
1527 )?;
1528 let offered_contract = get_contract_in_state!(
1529 self,
1530 &offered_channel.offered_contract_id,
1531 Offered,
1532 Some(*peer_id)
1533 )?;
1534
1535 let (signed_channel, signed_contract, sign_channel) = {
1536 let res = crate::channel_updater::verify_and_sign_accepted_channel(
1537 &self.secp,
1538 &offered_channel,
1539 &offered_contract,
1540 accept_channel,
1541 CET_NSEQUENCE,
1543 &self.wallet,
1544 &self.signer_provider,
1545 &self.chain_monitor,
1546 )
1547 .await;
1548
1549 match res {
1550 Ok(res) => res,
1551 Err(e) => {
1552 let channel = crate::channel::FailedAccept {
1553 temporary_channel_id: accept_channel.temporary_channel_id,
1554 error_message: format!("Error validating accept channel: {}", e),
1555 accept_message: accept_channel.clone(),
1556 counter_party: *peer_id,
1557 };
1558 self.store
1559 .upsert_channel(Channel::FailedAccept(channel), None)
1560 .await?;
1561 return Err(e);
1562 }
1563 }
1564 };
1565
1566 self.wallet.import_address(&Address::p2wsh(
1567 &signed_contract
1568 .accepted_contract
1569 .dlc_transactions
1570 .funding_script_pubkey,
1571 self.blockchain.get_network()?,
1572 ))?;
1573
1574 if let SignedChannelState::Established {
1575 buffer_transaction, ..
1576 } = &signed_channel.state
1577 {
1578 self.chain_monitor.lock().await.add_tx(
1579 buffer_transaction.compute_txid(),
1580 ChannelInfo {
1581 channel_id: signed_channel.channel_id,
1582 tx_type: TxType::BufferTx,
1583 },
1584 );
1585 } else {
1586 unreachable!();
1587 }
1588
1589 self.store
1590 .upsert_channel(
1591 Channel::Signed(signed_channel),
1592 Some(Contract::Signed(signed_contract)),
1593 )
1594 .await?;
1595
1596 self.store
1597 .persist_chain_monitor(&*self.chain_monitor.lock().await)
1598 .await?;
1599
1600 Ok(sign_channel)
1601 }
1602
1603 async fn on_sign_channel(
1604 &self,
1605 sign_channel: &SignChannel,
1606 peer_id: &PublicKey,
1607 ) -> Result<(), Error> {
1608 let accepted_channel =
1609 get_channel_in_state!(self, &sign_channel.channel_id, Accepted, Some(*peer_id))?;
1610 let accepted_contract = get_contract_in_state!(
1611 self,
1612 &accepted_channel.accepted_contract_id,
1613 Accepted,
1614 Some(*peer_id)
1615 )?;
1616
1617 let (signed_channel, signed_contract, signed_fund_tx) = {
1618 let res = verify_signed_channel(
1619 &self.secp,
1620 &accepted_channel,
1621 &accepted_contract,
1622 sign_channel,
1623 &self.wallet,
1624 &self.chain_monitor,
1625 )
1626 .await;
1627
1628 match res {
1629 Ok(res) => res,
1630 Err(e) => {
1631 let channel = crate::channel::FailedSign {
1632 channel_id: sign_channel.channel_id,
1633 error_message: format!("Error validating accept channel: {}", e),
1634 sign_message: sign_channel.clone(),
1635 counter_party: *peer_id,
1636 };
1637 self.store
1638 .upsert_channel(Channel::FailedSign(channel), None)
1639 .await?;
1640 return Err(e);
1641 }
1642 }
1643 };
1644
1645 if let SignedChannelState::Established {
1646 buffer_transaction, ..
1647 } = &signed_channel.state
1648 {
1649 self.chain_monitor.lock().await.add_tx(
1650 buffer_transaction.compute_txid(),
1651 ChannelInfo {
1652 channel_id: signed_channel.channel_id,
1653 tx_type: TxType::BufferTx,
1654 },
1655 );
1656 } else {
1657 unreachable!();
1658 }
1659
1660 self.blockchain.send_transaction(&signed_fund_tx).await?;
1661
1662 self.store
1663 .upsert_channel(
1664 Channel::Signed(signed_channel),
1665 Some(Contract::Signed(signed_contract)),
1666 )
1667 .await?;
1668 self.store
1669 .persist_chain_monitor(&*self.chain_monitor.lock().await)
1670 .await?;
1671
1672 Ok(())
1673 }
1674
1675 async fn on_settle_offer(
1676 &self,
1677 settle_offer: &SettleOffer,
1678 peer_id: &PublicKey,
1679 ) -> Result<Option<Reject>, Error> {
1680 let mut signed_channel =
1681 get_channel_in_state!(self, &settle_offer.channel_id, Signed, Some(*peer_id))?;
1682
1683 if let SignedChannelState::SettledOffered { .. } = signed_channel.state {
1684 return Ok(Some(Reject {
1685 channel_id: settle_offer.channel_id,
1686 }));
1687 }
1688
1689 crate::channel_updater::on_settle_offer(&mut signed_channel, settle_offer)?;
1690
1691 self.store
1692 .upsert_channel(Channel::Signed(signed_channel), None)
1693 .await?;
1694
1695 Ok(None)
1696 }
1697
1698 async fn on_settle_accept(
1699 &self,
1700 settle_accept: &SettleAccept,
1701 peer_id: &PublicKey,
1702 ) -> Result<SettleConfirm, Error> {
1703 let mut signed_channel =
1704 get_channel_in_state!(self, &settle_accept.channel_id, Signed, Some(*peer_id))?;
1705
1706 let msg = crate::channel_updater::settle_channel_confirm(
1707 &self.secp,
1708 &mut signed_channel,
1709 settle_accept,
1710 CET_NSEQUENCE,
1711 0,
1712 PEER_TIMEOUT,
1713 &self.signer_provider,
1714 &self.time,
1715 &self.chain_monitor,
1716 )
1717 .await?;
1718
1719 self.store
1720 .upsert_channel(Channel::Signed(signed_channel), None)
1721 .await?;
1722
1723 Ok(msg)
1724 }
1725
1726 async fn on_settle_confirm(
1727 &self,
1728 settle_confirm: &SettleConfirm,
1729 peer_id: &PublicKey,
1730 ) -> Result<SettleFinalize, Error> {
1731 let mut signed_channel =
1732 get_channel_in_state!(self, &settle_confirm.channel_id, Signed, Some(*peer_id))?;
1733 let &own_payout = get_signed_channel_state!(signed_channel, SettledAccepted, own_payout)?;
1734 let (prev_buffer_tx, own_buffer_adaptor_signature, is_offer, signed_contract_id) = get_signed_channel_rollback_state!(
1735 signed_channel,
1736 Established,
1737 buffer_transaction,
1738 own_buffer_adaptor_signature,
1739 is_offer,
1740 signed_contract_id
1741 )?;
1742
1743 let prev_buffer_txid = prev_buffer_tx.compute_txid();
1744 let own_buffer_adaptor_signature = *own_buffer_adaptor_signature;
1745 let is_offer = *is_offer;
1746 let signed_contract_id = *signed_contract_id;
1747
1748 let msg = crate::channel_updater::settle_channel_finalize(
1749 &self.secp,
1750 &mut signed_channel,
1751 settle_confirm,
1752 &self.signer_provider,
1753 )?;
1754
1755 self.chain_monitor.lock().await.add_tx(
1756 prev_buffer_txid,
1757 ChannelInfo {
1758 channel_id: signed_channel.channel_id,
1759 tx_type: TxType::Revoked {
1760 update_idx: signed_channel.update_idx + 1,
1761 own_adaptor_signature: own_buffer_adaptor_signature,
1762 is_offer,
1763 revoked_tx_type: RevokedTxType::Buffer,
1764 },
1765 },
1766 );
1767
1768 let closed_contract = Contract::Closed(
1769 self.get_collaboratively_closed_contract(&signed_contract_id, own_payout, true)
1770 .await?,
1771 );
1772
1773 self.store
1774 .upsert_channel(Channel::Signed(signed_channel), Some(closed_contract))
1775 .await?;
1776 self.store
1777 .persist_chain_monitor(&*self.chain_monitor.lock().await)
1778 .await?;
1779
1780 Ok(msg)
1781 }
1782
1783 async fn on_settle_finalize(
1784 &self,
1785 settle_finalize: &SettleFinalize,
1786 peer_id: &PublicKey,
1787 ) -> Result<(), Error> {
1788 let mut signed_channel =
1789 get_channel_in_state!(self, &settle_finalize.channel_id, Signed, Some(*peer_id))?;
1790 let &own_payout = get_signed_channel_state!(signed_channel, SettledConfirmed, own_payout)?;
1791 let (buffer_tx, own_buffer_adaptor_signature, is_offer, signed_contract_id) = get_signed_channel_rollback_state!(
1792 signed_channel,
1793 Established,
1794 buffer_transaction,
1795 own_buffer_adaptor_signature,
1796 is_offer,
1797 signed_contract_id
1798 )?;
1799
1800 let own_buffer_adaptor_signature = *own_buffer_adaptor_signature;
1801 let is_offer = *is_offer;
1802 let buffer_txid = buffer_tx.compute_txid();
1803 let signed_contract_id = *signed_contract_id;
1804
1805 crate::channel_updater::settle_channel_on_finalize(
1806 &self.secp,
1807 &mut signed_channel,
1808 settle_finalize,
1809 )?;
1810
1811 self.chain_monitor.lock().await.add_tx(
1812 buffer_txid,
1813 ChannelInfo {
1814 channel_id: signed_channel.channel_id,
1815 tx_type: TxType::Revoked {
1816 update_idx: signed_channel.update_idx + 1,
1817 own_adaptor_signature: own_buffer_adaptor_signature,
1818 is_offer,
1819 revoked_tx_type: RevokedTxType::Buffer,
1820 },
1821 },
1822 );
1823
1824 let closed_contract = Contract::Closed(
1825 self.get_collaboratively_closed_contract(&signed_contract_id, own_payout, true)
1826 .await?,
1827 );
1828 self.store
1829 .upsert_channel(Channel::Signed(signed_channel), Some(closed_contract))
1830 .await?;
1831 self.store
1832 .persist_chain_monitor(&*self.chain_monitor.lock().await)
1833 .await?;
1834
1835 Ok(())
1836 }
1837
1838 async fn on_renew_offer(
1839 &self,
1840 renew_offer: &RenewOffer,
1841 peer_id: &PublicKey,
1842 ) -> Result<Option<Reject>, Error> {
1843 let mut signed_channel =
1844 get_channel_in_state!(self, &renew_offer.channel_id, Signed, Some(*peer_id))?;
1845
1846 if let SignedChannelState::RenewOffered { is_offer, .. } = signed_channel.state {
1848 if is_offer {
1849 return Ok(Some(Reject {
1850 channel_id: renew_offer.channel_id,
1851 }));
1852 }
1853 }
1854
1855 let offered_contract = crate::channel_updater::on_renew_offer(
1856 &mut signed_channel,
1857 renew_offer,
1858 PEER_TIMEOUT,
1859 &self.time,
1860 )?;
1861
1862 self.store.create_contract(&offered_contract).await?;
1863 self.store
1864 .upsert_channel(Channel::Signed(signed_channel), None)
1865 .await?;
1866
1867 Ok(None)
1868 }
1869
1870 async fn on_renew_accept(
1871 &self,
1872 renew_accept: &RenewAccept,
1873 peer_id: &PublicKey,
1874 ) -> Result<RenewConfirm, Error> {
1875 let mut signed_channel =
1876 get_channel_in_state!(self, &renew_accept.channel_id, Signed, Some(*peer_id))?;
1877 let offered_contract_id = signed_channel.get_contract_id().ok_or_else(|| {
1878 Error::InvalidState(
1879 "Expected to be in a state with an associated contract id but was not.".to_string(),
1880 )
1881 })?;
1882
1883 let offered_contract =
1884 get_contract_in_state!(self, &offered_contract_id, Offered, Some(*peer_id))?;
1885
1886 let (signed_contract, msg) = crate::channel_updater::verify_renew_accept_and_confirm(
1887 &self.secp,
1888 renew_accept,
1889 &mut signed_channel,
1890 &offered_contract,
1891 CET_NSEQUENCE,
1892 PEER_TIMEOUT,
1893 &self.wallet,
1894 &self.signer_provider,
1895 &self.time,
1896 )?;
1897
1898 self.store
1900 .upsert_channel(
1901 Channel::Signed(signed_channel),
1902 Some(Contract::Confirmed(signed_contract)),
1903 )
1904 .await?;
1905
1906 Ok(msg)
1907 }
1908
1909 async fn on_renew_confirm(
1910 &self,
1911 renew_confirm: &RenewConfirm,
1912 peer_id: &PublicKey,
1913 ) -> Result<RenewFinalize, Error> {
1914 let mut signed_channel =
1915 get_channel_in_state!(self, &renew_confirm.channel_id, Signed, Some(*peer_id))?;
1916 let own_payout = get_signed_channel_state!(signed_channel, RenewAccepted, own_payout)?;
1917 let contract_id = signed_channel.get_contract_id().ok_or_else(|| {
1918 Error::InvalidState(
1919 "Expected to be in a state with an associated contract id but was not.".to_string(),
1920 )
1921 })?;
1922
1923 let (tx_type, prev_tx_id, closed_contract) = match signed_channel
1924 .roll_back_state
1925 .as_ref()
1926 .expect("to have a rollback state")
1927 {
1928 SignedChannelState::Established {
1929 own_buffer_adaptor_signature,
1930 buffer_transaction,
1931 signed_contract_id,
1932 ..
1933 } => {
1934 let closed_contract = Contract::Closed(
1935 self.get_collaboratively_closed_contract(signed_contract_id, *own_payout, true)
1936 .await?,
1937 );
1938 (
1939 TxType::Revoked {
1940 update_idx: signed_channel.update_idx,
1941 own_adaptor_signature: *own_buffer_adaptor_signature,
1942 is_offer: false,
1943 revoked_tx_type: RevokedTxType::Buffer,
1944 },
1945 buffer_transaction.compute_txid(),
1946 Some(closed_contract),
1947 )
1948 }
1949 SignedChannelState::Settled {
1950 settle_tx,
1951 own_settle_adaptor_signature,
1952 ..
1953 } => (
1954 TxType::Revoked {
1955 update_idx: signed_channel.update_idx,
1956 own_adaptor_signature: *own_settle_adaptor_signature,
1957 is_offer: false,
1958 revoked_tx_type: RevokedTxType::Settle,
1959 },
1960 settle_tx.compute_txid(),
1961 None,
1962 ),
1963 s => {
1964 return Err(Error::InvalidState(format!(
1965 "Expected rollback state Established or Revoked but found {s:?}"
1966 )))
1967 }
1968 };
1969 let accepted_contract =
1970 get_contract_in_state!(self, &contract_id, Accepted, Some(*peer_id))?;
1971
1972 let (signed_contract, msg) = crate::channel_updater::verify_renew_confirm_and_finalize(
1973 &self.secp,
1974 &mut signed_channel,
1975 &accepted_contract,
1976 renew_confirm,
1977 PEER_TIMEOUT,
1978 &self.time,
1979 &self.wallet,
1980 &self.signer_provider,
1981 &self.chain_monitor,
1982 )
1983 .await?;
1984
1985 self.chain_monitor.lock().await.add_tx(
1986 prev_tx_id,
1987 ChannelInfo {
1988 channel_id: signed_channel.channel_id,
1989 tx_type,
1990 },
1991 );
1992
1993 self.store
1995 .upsert_channel(
1996 Channel::Signed(signed_channel),
1997 Some(Contract::Confirmed(signed_contract)),
1998 )
1999 .await?;
2000
2001 self.store
2002 .persist_chain_monitor(&*self.chain_monitor.lock().await)
2003 .await?;
2004
2005 if let Some(closed_contract) = closed_contract {
2006 self.store.update_contract(&closed_contract).await?;
2007 }
2008
2009 Ok(msg)
2010 }
2011
2012 async fn on_renew_finalize(
2013 &self,
2014 renew_finalize: &RenewFinalize,
2015 peer_id: &PublicKey,
2016 ) -> Result<RenewRevoke, Error> {
2017 let mut signed_channel =
2018 get_channel_in_state!(self, &renew_finalize.channel_id, Signed, Some(*peer_id))?;
2019 let own_payout = get_signed_channel_state!(signed_channel, RenewConfirmed, own_payout)?;
2020
2021 let (tx_type, prev_tx_id, closed_contract) = match signed_channel
2022 .roll_back_state
2023 .as_ref()
2024 .expect("to have a rollback state")
2025 {
2026 SignedChannelState::Established {
2027 own_buffer_adaptor_signature,
2028 buffer_transaction,
2029 signed_contract_id,
2030 ..
2031 } => {
2032 let closed_contract = self
2033 .get_collaboratively_closed_contract(signed_contract_id, *own_payout, true)
2034 .await?;
2035 (
2036 TxType::Revoked {
2037 update_idx: signed_channel.update_idx,
2038 own_adaptor_signature: *own_buffer_adaptor_signature,
2039 is_offer: false,
2040 revoked_tx_type: RevokedTxType::Buffer,
2041 },
2042 buffer_transaction.compute_txid(),
2043 Some(Contract::Closed(closed_contract)),
2044 )
2045 }
2046 SignedChannelState::Settled {
2047 settle_tx,
2048 own_settle_adaptor_signature,
2049 ..
2050 } => (
2051 TxType::Revoked {
2052 update_idx: signed_channel.update_idx,
2053 own_adaptor_signature: *own_settle_adaptor_signature,
2054 is_offer: false,
2055 revoked_tx_type: RevokedTxType::Settle,
2056 },
2057 settle_tx.compute_txid(),
2058 None,
2059 ),
2060 s => {
2061 return Err(Error::InvalidState(format!(
2062 "Expected rollback state of Established or Settled but was {s:?}"
2063 )))
2064 }
2065 };
2066
2067 let msg = crate::channel_updater::renew_channel_on_finalize(
2068 &self.secp,
2069 &mut signed_channel,
2070 renew_finalize,
2071 &self.signer_provider,
2072 )?;
2073
2074 self.chain_monitor.lock().await.add_tx(
2075 prev_tx_id,
2076 ChannelInfo {
2077 channel_id: signed_channel.channel_id,
2078 tx_type,
2079 },
2080 );
2081
2082 let buffer_tx =
2083 get_signed_channel_state!(signed_channel, Established, ref buffer_transaction)?;
2084
2085 self.chain_monitor.lock().await.add_tx(
2086 buffer_tx.compute_txid(),
2087 ChannelInfo {
2088 channel_id: signed_channel.channel_id,
2089 tx_type: TxType::BufferTx,
2090 },
2091 );
2092
2093 self.store
2094 .upsert_channel(Channel::Signed(signed_channel), None)
2095 .await?;
2096 self.store
2097 .persist_chain_monitor(&*self.chain_monitor.lock().await)
2098 .await?;
2099
2100 if let Some(closed_contract) = closed_contract {
2101 self.store.update_contract(&closed_contract).await?;
2102 }
2103
2104 Ok(msg)
2105 }
2106
2107 async fn on_renew_revoke(
2108 &self,
2109 renew_revoke: &RenewRevoke,
2110 peer_id: &PublicKey,
2111 ) -> Result<(), Error> {
2112 let mut signed_channel =
2113 get_channel_in_state!(self, &renew_revoke.channel_id, Signed, Some(*peer_id))?;
2114
2115 crate::channel_updater::renew_channel_on_revoke(
2116 &self.secp,
2117 &mut signed_channel,
2118 renew_revoke,
2119 )?;
2120
2121 self.store
2122 .upsert_channel(Channel::Signed(signed_channel), None)
2123 .await?;
2124
2125 Ok(())
2126 }
2127
2128 async fn on_collaborative_close_offer(
2129 &self,
2130 close_offer: &CollaborativeCloseOffer,
2131 peer_id: &PublicKey,
2132 ) -> Result<(), Error> {
2133 let mut signed_channel =
2134 get_channel_in_state!(self, &close_offer.channel_id, Signed, Some(*peer_id))?;
2135
2136 crate::channel_updater::on_collaborative_close_offer(
2137 &mut signed_channel,
2138 close_offer,
2139 PEER_TIMEOUT,
2140 &self.time,
2141 )?;
2142
2143 self.store
2144 .upsert_channel(Channel::Signed(signed_channel), None)
2145 .await?;
2146
2147 Ok(())
2148 }
2149
2150 async fn on_reject(&self, reject: &Reject, counter_party: &PublicKey) -> Result<(), Error> {
2151 let channel = self.store.get_channel(&reject.channel_id).await?;
2152
2153 if let Some(channel) = channel {
2154 if channel.get_counter_party_id() != *counter_party {
2155 return Err(Error::InvalidParameters(format!(
2156 "Peer {:02x?} is not involved with {} {:02x?}.",
2157 counter_party,
2158 stringify!(Channel),
2159 channel.get_id()
2160 )));
2161 }
2162 match channel {
2163 Channel::Offered(offered_channel) => {
2164 let offered_contract = get_contract_in_state!(
2165 self,
2166 &offered_channel.offered_contract_id,
2167 Offered,
2168 None as Option<PublicKey>
2169 )?;
2170 let utxos = offered_contract
2171 .funding_inputs
2172 .iter()
2173 .map(|funding_input| {
2174 let txid = Transaction::consensus_decode(
2175 &mut funding_input.prev_tx.as_slice(),
2176 )
2177 .expect("Transaction Decode Error")
2178 .compute_txid();
2179 let vout = funding_input.prev_tx_vout;
2180 OutPoint { txid, vout }
2181 })
2182 .collect::<Vec<_>>();
2183
2184 self.wallet.unreserve_utxos(&utxos)?;
2185
2186 self.store
2188 .upsert_channel(
2189 Channel::Cancelled(offered_channel),
2190 Some(Contract::Rejected(offered_contract)),
2191 )
2192 .await?;
2193 }
2194 Channel::Signed(mut signed_channel) => {
2195 let contract = match signed_channel.state {
2196 SignedChannelState::RenewOffered {
2197 offered_contract_id,
2198 ..
2199 } => {
2200 let offered_contract = get_contract_in_state!(
2201 self,
2202 &offered_contract_id,
2203 Offered,
2204 None::<PublicKey>
2205 )?;
2206 Some(Contract::Rejected(offered_contract))
2207 }
2208 _ => None,
2209 };
2210
2211 crate::channel_updater::on_reject(&mut signed_channel)?;
2212
2213 self.store
2214 .upsert_channel(Channel::Signed(signed_channel), contract)
2215 .await?;
2216 }
2217 channel => {
2218 return Err(Error::InvalidState(format!(
2219 "Not in a state adequate to receive a reject message. {:?}",
2220 channel
2221 )))
2222 }
2223 }
2224 } else {
2225 tracing::warn!(
2226 "Couldn't find rejected dlc channel with id: {}",
2227 reject.channel_id.to_lower_hex_string()
2228 );
2229 }
2230
2231 Ok(())
2232 }
2233
2234 async fn channel_checks(&self) -> Result<(), Error> {
2235 let established_closing_channels = self
2236 .store
2237 .get_signed_channels(Some(SignedChannelStateType::Closing))
2238 .await?;
2239
2240 for channel in established_closing_channels {
2241 if let Err(e) = self.try_finalize_closing_established_channel(channel).await {
2242 tracing::error!("Error trying to close established channel: {}", e);
2243 }
2244 }
2245
2246 if let Err(e) = self.check_for_timed_out_channels().await {
2247 tracing::error!("Error checking timed out channels {}", e);
2248 }
2249 self.check_for_watched_tx().await
2250 }
2251
2252 async fn check_for_timed_out_channels(&self) -> Result<(), Error> {
2253 check_for_timed_out_channels!(self, RenewOffered);
2254 check_for_timed_out_channels!(self, RenewAccepted);
2255 check_for_timed_out_channels!(self, RenewConfirmed);
2256 check_for_timed_out_channels!(self, SettledOffered);
2257 check_for_timed_out_channels!(self, SettledAccepted);
2258 check_for_timed_out_channels!(self, SettledConfirmed);
2259
2260 Ok(())
2261 }
2262
2263 pub(crate) async fn process_watched_txs(
2264 &self,
2265 watched_txs: Vec<(Transaction, ChannelInfo)>,
2266 ) -> Result<(), Error> {
2267 for (tx, channel_info) in watched_txs {
2268 let mut signed_channel = match get_channel_in_state!(
2269 self,
2270 &channel_info.channel_id,
2271 Signed,
2272 None as Option<PublicKey>
2273 ) {
2274 Ok(c) => c,
2275 Err(e) => {
2276 tracing::error!(
2277 "Could not retrieve channel {:?}: {}",
2278 channel_info.channel_id,
2279 e
2280 );
2281 continue;
2282 }
2283 };
2284
2285 let persist = match channel_info.tx_type {
2286 TxType::BufferTx => {
2287 let contract_id = signed_channel
2294 .get_contract_id()
2295 .expect("to have a contract id");
2296 let mut state = SignedChannelState::Closing {
2297 buffer_transaction: tx.clone(),
2298 is_initiator: false,
2299 contract_id,
2300 keys_id: signed_channel.keys_id().ok_or_else(|| {
2301 Error::InvalidState("Expected to have keys_id.".to_string())
2302 })?,
2303 };
2304 std::mem::swap(&mut signed_channel.state, &mut state);
2305
2306 signed_channel.roll_back_state = Some(state);
2307
2308 self.store
2309 .upsert_channel(Channel::Signed(signed_channel), None)
2310 .await?;
2311
2312 false
2313 }
2314 TxType::Revoked {
2315 update_idx,
2316 own_adaptor_signature,
2317 is_offer,
2318 revoked_tx_type,
2319 } => {
2320 let secret = signed_channel
2321 .counter_party_commitment_secrets
2322 .get_secret(update_idx)
2323 .expect("to be able to retrieve the per update secret");
2324 let counter_per_update_secret = SecretKey::from_slice(&secret)
2325 .expect("to be able to parse the counter per update secret.");
2326
2327 let per_update_seed_pk = signed_channel.own_per_update_seed;
2328
2329 let per_update_seed_sk = self
2330 .signer_provider
2331 .get_secret_key_for_pubkey(&per_update_seed_pk)?;
2332
2333 let per_update_secret = SecretKey::from_slice(&build_commitment_secret(
2334 per_update_seed_sk.as_ref(),
2335 update_idx,
2336 ))
2337 .expect("a valid secret key.");
2338
2339 let per_update_point =
2340 PublicKey::from_secret_key(&self.secp, &per_update_secret);
2341
2342 let own_revocation_params = signed_channel.own_points.get_revokable_params(
2343 &self.secp,
2344 &signed_channel.counter_points.revocation_basepoint,
2345 &per_update_point,
2346 );
2347
2348 let counter_per_update_point =
2349 PublicKey::from_secret_key(&self.secp, &counter_per_update_secret);
2350
2351 let base_own_sk = self
2352 .signer_provider
2353 .get_secret_key_for_pubkey(&signed_channel.own_points.own_basepoint)?;
2354
2355 let own_sk = derive_private_key(&self.secp, &per_update_point, &base_own_sk);
2356
2357 let counter_revocation_params =
2358 signed_channel.counter_points.get_revokable_params(
2359 &self.secp,
2360 &signed_channel.own_points.revocation_basepoint,
2361 &counter_per_update_point,
2362 );
2363
2364 let witness = if signed_channel.own_params.fund_pubkey
2365 < signed_channel.counter_params.fund_pubkey
2366 {
2367 tx.input[0].witness.to_vec().remove(1)
2368 } else {
2369 tx.input[0].witness.to_vec().remove(2)
2370 };
2371
2372 let sig_data = witness
2373 .iter()
2374 .take(witness.len() - 1)
2375 .cloned()
2376 .collect::<Vec<_>>();
2377 let own_sig = Signature::from_der(&sig_data)?;
2378
2379 let counter_sk = own_adaptor_signature.recover(
2380 &self.secp,
2381 &own_sig,
2382 &counter_revocation_params.publish_pk.inner,
2383 )?;
2384
2385 let own_revocation_base_secret =
2386 &self.signer_provider.get_secret_key_for_pubkey(
2387 &signed_channel.own_points.revocation_basepoint,
2388 )?;
2389
2390 let counter_revocation_sk = derive_private_revocation_key(
2391 &self.secp,
2392 &counter_per_update_secret,
2393 own_revocation_base_secret,
2394 );
2395
2396 let (offer_params, accept_params) = if is_offer {
2397 (&own_revocation_params, &counter_revocation_params)
2398 } else {
2399 (&counter_revocation_params, &own_revocation_params)
2400 };
2401
2402 let fee_rate_per_vb: u64 = (self.fee_estimator.get_est_sat_per_1000_weight(
2403 lightning::chain::chaininterface::ConfirmationTarget::UrgentOnChainSweep,
2404 ) / 250)
2405 .into();
2406
2407 let signed_tx = match revoked_tx_type {
2408 RevokedTxType::Buffer => {
2409 dlc::channel::create_and_sign_punish_buffer_transaction(
2410 &self.secp,
2411 offer_params,
2412 accept_params,
2413 &own_sk,
2414 &counter_sk,
2415 &counter_revocation_sk,
2416 &tx,
2417 &self.wallet.get_new_address()?,
2418 0,
2419 fee_rate_per_vb,
2420 )?
2421 }
2422 RevokedTxType::Settle => {
2423 dlc::channel::create_and_sign_punish_settle_transaction(
2424 &self.secp,
2425 offer_params,
2426 accept_params,
2427 &own_sk,
2428 &counter_sk,
2429 &counter_revocation_sk,
2430 &tx,
2431 &self.wallet.get_new_address()?,
2432 CET_NSEQUENCE,
2433 0,
2434 fee_rate_per_vb,
2435 is_offer,
2436 )?
2437 }
2438 };
2439
2440 self.blockchain.send_transaction(&signed_tx).await?;
2441
2442 let closed_channel = Channel::ClosedPunished(ClosedPunishedChannel {
2443 counter_party: signed_channel.counter_party,
2444 temporary_channel_id: signed_channel.temporary_channel_id,
2445 channel_id: signed_channel.channel_id,
2446 punish_txid: signed_tx.compute_txid(),
2447 });
2448
2449 self.chain_monitor
2452 .lock()
2453 .await
2454 .cleanup_channel(signed_channel.channel_id);
2455 self.store.upsert_channel(closed_channel, None).await?;
2456 true
2457 }
2458 TxType::CollaborativeClose => {
2459 if let Some(SignedChannelState::Established {
2460 signed_contract_id, ..
2461 }) = signed_channel.roll_back_state
2462 {
2463 let counter_payout = get_signed_channel_state!(
2464 signed_channel,
2465 CollaborativeCloseOffered,
2466 counter_payout
2467 )?;
2468 let closed_contract = self
2469 .get_collaboratively_closed_contract(
2470 &signed_contract_id,
2471 *counter_payout,
2472 false,
2473 )
2474 .await?;
2475 self.store
2476 .update_contract(&Contract::Closed(closed_contract))
2477 .await?;
2478 }
2479 let closed_channel = Channel::CollaborativelyClosed(ClosedChannel {
2480 counter_party: signed_channel.counter_party,
2481 temporary_channel_id: signed_channel.temporary_channel_id,
2482 channel_id: signed_channel.channel_id,
2483 });
2484 self.chain_monitor
2485 .lock()
2486 .await
2487 .cleanup_channel(signed_channel.channel_id);
2488 self.store.upsert_channel(closed_channel, None).await?;
2489 true
2490 }
2491 TxType::SettleTx => {
2492 let closed_channel = Channel::CounterClosed(ClosedChannel {
2493 counter_party: signed_channel.counter_party,
2494 temporary_channel_id: signed_channel.temporary_channel_id,
2495 channel_id: signed_channel.channel_id,
2496 });
2497 self.chain_monitor
2498 .lock()
2499 .await
2500 .cleanup_channel(signed_channel.channel_id);
2501 self.store.upsert_channel(closed_channel, None).await?;
2502 true
2503 }
2504 TxType::Cet => {
2505 let contract_id = signed_channel.get_contract_id();
2506 let closed_channel = {
2507 match &signed_channel.state {
2508 SignedChannelState::Closing { is_initiator, .. } => {
2509 if *is_initiator {
2510 Channel::Closed(ClosedChannel {
2511 counter_party: signed_channel.counter_party,
2512 temporary_channel_id: signed_channel.temporary_channel_id,
2513 channel_id: signed_channel.channel_id,
2514 })
2515 } else {
2516 Channel::CounterClosed(ClosedChannel {
2517 counter_party: signed_channel.counter_party,
2518 temporary_channel_id: signed_channel.temporary_channel_id,
2519 channel_id: signed_channel.channel_id,
2520 })
2521 }
2522 }
2523 _ => {
2524 tracing::error!("Saw spending of buffer transaction without being in closing state");
2525 Channel::Closed(ClosedChannel {
2526 counter_party: signed_channel.counter_party,
2527 temporary_channel_id: signed_channel.temporary_channel_id,
2528 channel_id: signed_channel.channel_id,
2529 })
2530 }
2531 }
2532 };
2533
2534 self.chain_monitor
2535 .lock()
2536 .await
2537 .cleanup_channel(signed_channel.channel_id);
2538 let pre_closed_contract = futures::stream::iter(contract_id)
2539 .then(|contract_id| {
2540 let txn = tx.clone();
2541 async move {
2542 self.store.get_contract(&contract_id).await.map(|contract| {
2543 contract.and_then(|contract| match contract {
2544 Contract::Confirmed(signed_contract) => {
2545 Some(Contract::PreClosed(PreClosedContract {
2546 signed_contract,
2547 attestations: None,
2548 signed_cet: txn,
2549 }))
2550 }
2551 _ => None,
2552 })
2553 })
2554 }
2555 })
2556 .try_collect::<Vec<_>>()
2557 .await?
2558 .into_iter()
2559 .flatten()
2560 .next();
2561
2562 self.store
2563 .upsert_channel(closed_channel, pre_closed_contract)
2564 .await?;
2565
2566 true
2567 }
2568 };
2569
2570 if persist {
2571 self.store
2572 .persist_chain_monitor(&*self.chain_monitor.lock().await)
2573 .await?;
2574 }
2575 }
2576 Ok(())
2577 }
2578
2579 async fn check_for_watched_tx(&self) -> Result<(), Error> {
2580 let confirmed_txs = self.chain_monitor.lock().await.confirmed_txs();
2581
2582 self.process_watched_txs(confirmed_txs).await?;
2583
2584 self.store
2585 .persist_chain_monitor(&*self.chain_monitor.lock().await)
2586 .await?;
2587
2588 Ok(())
2589 }
2590
2591 async fn force_close_channel_internal(
2592 &self,
2593 mut channel: SignedChannel,
2594 is_initiator: bool,
2595 ) -> Result<(), Error> {
2596 match &channel.state {
2597 SignedChannelState::Established {
2598 counter_buffer_adaptor_signature,
2599 buffer_transaction,
2600 ..
2601 } => {
2602 let counter_buffer_adaptor_signature = *counter_buffer_adaptor_signature;
2603 let buffer_transaction = buffer_transaction.clone();
2604 self.initiate_unilateral_close_established_channel(
2605 channel,
2606 is_initiator,
2607 counter_buffer_adaptor_signature,
2608 buffer_transaction,
2609 )
2610 .await
2611 }
2612 SignedChannelState::RenewFinalized {
2613 buffer_transaction,
2614 offer_buffer_adaptor_signature,
2615 ..
2616 } => {
2617 let offer_buffer_adaptor_signature = *offer_buffer_adaptor_signature;
2618 let buffer_transaction = buffer_transaction.clone();
2619 self.initiate_unilateral_close_established_channel(
2620 channel,
2621 is_initiator,
2622 offer_buffer_adaptor_signature,
2623 buffer_transaction,
2624 )
2625 .await
2626 }
2627 SignedChannelState::Settled { .. } => {
2628 self.close_settled_channel(channel, is_initiator).await
2629 }
2630 SignedChannelState::SettledOffered { .. }
2631 | SignedChannelState::SettledReceived { .. }
2632 | SignedChannelState::SettledAccepted { .. }
2633 | SignedChannelState::SettledConfirmed { .. }
2634 | SignedChannelState::RenewOffered { .. }
2635 | SignedChannelState::RenewAccepted { .. }
2636 | SignedChannelState::RenewConfirmed { .. }
2637 | SignedChannelState::CollaborativeCloseOffered { .. } => {
2638 channel.state = channel
2639 .roll_back_state
2640 .take()
2641 .expect("to have a rollback state");
2642 let channel_clone = channel.clone(); Box::pin(self.force_close_channel_internal(channel_clone, is_initiator)).await
2644 }
2645 SignedChannelState::Closing { .. } => Err(Error::InvalidState(
2646 "Channel is already closing.".to_string(),
2647 )),
2648 }
2649 }
2650
2651 async fn initiate_unilateral_close_established_channel(
2653 &self,
2654 mut signed_channel: SignedChannel,
2655 is_initiator: bool,
2656 buffer_adaptor_signature: EcdsaAdaptorSignature,
2657 buffer_transaction: Transaction,
2658 ) -> Result<(), Error> {
2659 let keys_id = signed_channel.keys_id().ok_or_else(|| {
2660 Error::InvalidState("Expected to be in a state with an associated keys id.".to_string())
2661 })?;
2662
2663 crate::channel_updater::initiate_unilateral_close_established_channel(
2664 &self.secp,
2665 &mut signed_channel,
2666 buffer_adaptor_signature,
2667 keys_id,
2668 buffer_transaction,
2669 &self.signer_provider,
2670 is_initiator,
2671 )?;
2672
2673 let buffer_transaction =
2674 get_signed_channel_state!(signed_channel, Closing, ref buffer_transaction)?;
2675
2676 self.blockchain.send_transaction(buffer_transaction).await?;
2677
2678 self.chain_monitor
2679 .lock()
2680 .await
2681 .remove_tx(&buffer_transaction.compute_txid());
2682
2683 self.store
2684 .upsert_channel(Channel::Signed(signed_channel), None)
2685 .await?;
2686
2687 self.store
2688 .persist_chain_monitor(&*self.chain_monitor.lock().await)
2689 .await?;
2690
2691 Ok(())
2692 }
2693
2694 async fn close_settled_channel(
2696 &self,
2697 signed_channel: SignedChannel,
2698 is_initiator: bool,
2699 ) -> Result<(), Error> {
2700 let (settle_tx, closed_channel) = crate::channel_updater::close_settled_channel(
2701 &self.secp,
2702 &signed_channel,
2703 &self.signer_provider,
2704 is_initiator,
2705 )?;
2706
2707 if self
2708 .blockchain
2709 .get_transaction_confirmations(&settle_tx.compute_txid())
2710 .await
2711 .unwrap_or(0)
2712 == 0
2713 {
2714 self.blockchain.send_transaction(&settle_tx).await?;
2715 }
2716
2717 self.chain_monitor
2718 .lock()
2719 .await
2720 .cleanup_channel(signed_channel.channel_id);
2721
2722 self.store.upsert_channel(closed_channel, None).await?;
2723
2724 Ok(())
2725 }
2726
2727 async fn get_collaboratively_closed_contract(
2728 &self,
2729 contract_id: &ContractId,
2730 payout: u64,
2731 is_own_payout: bool,
2732 ) -> Result<ClosedContract, Error> {
2733 let contract = get_contract_in_state!(self, contract_id, Confirmed, None::<PublicKey>)?;
2734 let own_collateral = if contract.accepted_contract.offered_contract.is_offer_party {
2735 contract
2736 .accepted_contract
2737 .offered_contract
2738 .offer_params
2739 .collateral
2740 } else {
2741 contract.accepted_contract.accept_params.collateral
2742 };
2743 let own_payout = if is_own_payout {
2744 payout
2745 } else {
2746 contract.accepted_contract.offered_contract.total_collateral - payout
2747 };
2748 let pnl = own_payout as i64 - own_collateral as i64;
2749 Ok(ClosedContract {
2750 attestations: None,
2751 signed_cet: None,
2752 contract_id: *contract_id,
2753 temporary_contract_id: contract.accepted_contract.offered_contract.id,
2754 counter_party_id: contract.accepted_contract.offered_contract.counter_party,
2755 pnl,
2756 })
2757 }
2758
2759 async fn oracle_announcements(
2760 &self,
2761 contract_input: &ContractInput,
2762 ) -> Result<Vec<Vec<OracleAnnouncement>>, Error> {
2763 let announcements = stream::iter(contract_input.contract_infos.iter())
2764 .map(|x| {
2765 let future = self.get_oracle_announcements(&x.oracles);
2766 async move {
2767 match future.await {
2768 Ok(result) => Ok(result),
2769 Err(e) => {
2770 tracing::error!("Failed to get oracle announcements: {}", e);
2771 Err(e)
2772 }
2773 }
2774 }
2775 })
2776 .collect::<FuturesUnordered<_>>()
2777 .await
2778 .try_collect::<Vec<_>>()
2779 .await?;
2780 Ok(announcements)
2781 }
2782}