1use super::{
4 Blockchain, CachedContractSignerProvider, ContractSigner, Oracle, Storage, Time, Wallet,
5};
6use crate::chain_monitor::{ChainMonitor, ChannelInfo, RevokedTxType, TxType};
7use crate::channel::offered_channel::OfferedChannel;
8use crate::channel::signed_channel::{SignedChannel, SignedChannelState, SignedChannelStateType};
9use crate::channel::{Channel, ClosedChannel, ClosedPunishedChannel};
10use crate::channel_updater::get_signed_channel_state;
11use crate::channel_updater::verify_signed_channel;
12use crate::contract::{
13 accepted_contract::AcceptedContract, contract_info::ContractInfo,
14 contract_input::ContractInput, contract_input::OracleInput, offered_contract::OfferedContract,
15 signed_contract::SignedContract, AdaptorInfo, ClosedContract, Contract, FailedAcceptContract,
16 FailedSignContract, PreClosedContract,
17};
18use crate::contract_updater::{accept_contract, verify_accepted_and_sign_contract};
19use crate::error::Error;
20use crate::utils::get_object_in_state;
21use crate::{ChannelId, ContractId, ContractSignerProvider};
22use bitcoin::absolute::Height;
23use bitcoin::consensus::encode::serialize_hex;
24use bitcoin::consensus::Decodable;
25use bitcoin::{Address, Amount, SignedAmount};
26use bitcoin::{OutPoint, Transaction};
27use ddk_messages::channel::{
28 AcceptChannel, CollaborativeCloseOffer, OfferChannel, Reject, RenewAccept, RenewConfirm,
29 RenewFinalize, RenewOffer, RenewRevoke, SettleAccept, SettleConfirm, SettleFinalize,
30 SettleOffer, SignChannel,
31};
32use ddk_messages::oracle_msgs::{OracleAnnouncement, OracleAttestation};
33use ddk_messages::{AcceptDlc, CloseDlc, Message as DlcMessage, OfferDlc, SignDlc};
34use futures::stream;
35use futures::stream::FuturesUnordered;
36use futures::{StreamExt, TryStreamExt};
37use hex::DisplayHex;
38use lightning::chain::chaininterface::FeeEstimator;
39use lightning::ln::chan_utils::{
40 build_commitment_secret, derive_private_key, derive_private_revocation_key,
41};
42use secp256k1_zkp::XOnlyPublicKey;
43use secp256k1_zkp::{
44 ecdsa::Signature, All, EcdsaAdaptorSignature, PublicKey, Secp256k1, SecretKey,
45};
46use std::collections::HashMap;
47use std::ops::Deref;
48use std::string::ToString;
49use std::sync::Arc;
50use tokio::sync::Mutex;
51
52pub const NB_CONFIRMATIONS: u32 = 6;
54pub const REFUND_DELAY: u32 = 86400 * 7;
56pub const CET_NSEQUENCE: u32 = 288;
58pub const PEER_TIMEOUT: u64 = 3600;
61
62type ClosableContractInfo<'a> = Option<(
63 &'a ContractInfo,
64 &'a AdaptorInfo,
65 Vec<(usize, OracleAttestation)>,
66)>;
67
68#[derive(Debug)]
70pub struct Manager<
71 W: Deref,
72 SP: Deref,
73 B: Deref,
74 S: Deref,
75 O: Deref,
76 T: Deref,
77 F: Deref,
78 X: ContractSigner,
79> where
80 W::Target: Wallet,
81 SP::Target: ContractSignerProvider<Signer = X>,
82 B::Target: Blockchain,
83 S::Target: Storage,
84 O::Target: Oracle,
85 T::Target: Time,
86 F::Target: FeeEstimator,
87{
88 oracles: HashMap<XOnlyPublicKey, O>,
89 wallet: W,
90 signer_provider: SP,
91 blockchain: B,
92 store: S,
93 secp: Secp256k1<All>,
94 chain_monitor: Mutex<ChainMonitor>,
95 time: T,
96 fee_estimator: F,
97}
98
99macro_rules! get_contract_in_state {
100 ($manager: ident, $contract_id: expr, $state: ident, $peer_id: expr) => {{
101 get_object_in_state!(
102 $manager,
103 $contract_id,
104 $state,
105 $peer_id,
106 Contract,
107 get_contract
108 )
109 }};
110}
111
112macro_rules! get_channel_in_state {
113 ($manager: ident, $channel_id: expr, $state: ident, $peer_id: expr) => {{
114 get_object_in_state!(
115 $manager,
116 $channel_id,
117 $state,
118 $peer_id,
119 Channel,
120 get_channel
121 )
122 }};
123}
124
125macro_rules! get_signed_channel_rollback_state {
126 ($signed_channel: ident, $state: ident, $($field: ident),*) => {{
127 match $signed_channel.roll_back_state.as_ref() {
128 Some(SignedChannelState::$state{$($field,)* ..}) => Ok(($($field,)*)),
129 _ => Err(Error::InvalidState(format!("Expected rollback state {} got {:?}", stringify!($state), $signed_channel.state))),
130 }
131 }};
132}
133
134macro_rules! check_for_timed_out_channels {
135 ($manager: ident, $state: ident) => {
136 let channels = $manager
137 .store
138 .get_signed_channels(Some(SignedChannelStateType::$state))
139 .await?;
140
141 for channel in channels {
142 if let SignedChannelState::$state { timeout, .. } = channel.state {
143 let is_timed_out = timeout < $manager.time.unix_time_now();
144 if is_timed_out {
145 match $manager.force_close_channel_internal(channel, true).await {
146 Err(e) => tracing::error!("Error force closing channel {}", e),
147 _ => {}
148 }
149 }
150 }
151 }
152 };
153}
154
155impl<W: Deref, SP: Deref, B: Deref, S: Deref, O: Deref, T: Deref, F: Deref, X: ContractSigner>
156 Manager<W, Arc<CachedContractSignerProvider<SP, X>>, B, S, O, T, F, X>
157where
158 W::Target: Wallet,
159 SP::Target: ContractSignerProvider<Signer = X>,
160 B::Target: Blockchain,
161 S::Target: Storage,
162 O::Target: Oracle,
163 T::Target: Time,
164 F::Target: FeeEstimator,
165{
166 pub async fn new(
168 wallet: W,
169 signer_provider: SP,
170 blockchain: B,
171 store: S,
172 oracles: HashMap<XOnlyPublicKey, O>,
173 time: T,
174 fee_estimator: F,
175 ) -> Result<Self, Error> {
176 let init_height = blockchain.get_blockchain_height().await?;
177 let chain_monitor = Mutex::new(
178 store
179 .get_chain_monitor()
180 .await?
181 .unwrap_or(ChainMonitor::new(init_height)),
182 );
183
184 let signer_provider = Arc::new(CachedContractSignerProvider::new(signer_provider));
185
186 Ok(Manager {
187 secp: secp256k1_zkp::Secp256k1::new(),
188 wallet,
189 signer_provider,
190 blockchain,
191 store,
192 oracles,
193 time,
194 fee_estimator,
195 chain_monitor,
196 })
197 }
198
199 pub fn get_store(&self) -> &S {
201 &self.store
202 }
203
204 pub async fn on_dlc_message(
206 &self,
207 msg: &DlcMessage,
208 counter_party: PublicKey,
209 ) -> Result<Option<DlcMessage>, Error> {
210 match msg {
211 DlcMessage::Offer(o) => {
212 self.on_offer_message(o, counter_party).await?;
213 Ok(None)
214 }
215 DlcMessage::Accept(a) => Ok(Some(self.on_accept_message(a, &counter_party).await?)),
216 DlcMessage::Sign(s) => {
217 self.on_sign_message(s, &counter_party).await?;
218 Ok(None)
219 }
220 DlcMessage::Close(c) => {
221 self.on_close_message(c, &counter_party).await?;
222 Ok(None)
223 }
224 DlcMessage::OfferChannel(o) => {
225 self.on_offer_channel(o, counter_party).await?;
226 Ok(None)
227 }
228 DlcMessage::AcceptChannel(a) => Ok(Some(DlcMessage::SignChannel(
229 self.on_accept_channel(a, &counter_party).await?,
230 ))),
231 DlcMessage::SignChannel(s) => {
232 self.on_sign_channel(s, &counter_party).await?;
233 Ok(None)
234 }
235 DlcMessage::SettleOffer(s) => match self.on_settle_offer(s, &counter_party).await? {
236 Some(msg) => Ok(Some(DlcMessage::Reject(msg))),
237 None => Ok(None),
238 },
239 DlcMessage::SettleAccept(s) => Ok(Some(DlcMessage::SettleConfirm(
240 self.on_settle_accept(s, &counter_party).await?,
241 ))),
242 DlcMessage::SettleConfirm(s) => Ok(Some(DlcMessage::SettleFinalize(
243 self.on_settle_confirm(s, &counter_party).await?,
244 ))),
245 DlcMessage::SettleFinalize(s) => {
246 self.on_settle_finalize(s, &counter_party).await?;
247 Ok(None)
248 }
249 DlcMessage::RenewOffer(r) => match self.on_renew_offer(r, &counter_party).await? {
250 Some(msg) => Ok(Some(DlcMessage::Reject(msg))),
251 None => Ok(None),
252 },
253 DlcMessage::RenewAccept(r) => Ok(Some(DlcMessage::RenewConfirm(
254 self.on_renew_accept(r, &counter_party).await?,
255 ))),
256 DlcMessage::RenewConfirm(r) => Ok(Some(DlcMessage::RenewFinalize(
257 self.on_renew_confirm(r, &counter_party).await?,
258 ))),
259 DlcMessage::RenewFinalize(r) => {
260 let revoke = self.on_renew_finalize(r, &counter_party).await?;
261 Ok(Some(DlcMessage::RenewRevoke(revoke)))
262 }
263 DlcMessage::RenewRevoke(r) => {
264 self.on_renew_revoke(r, &counter_party).await?;
265 Ok(None)
266 }
267 DlcMessage::CollaborativeCloseOffer(c) => {
268 self.on_collaborative_close_offer(c, &counter_party).await?;
269 Ok(None)
270 }
271 DlcMessage::Reject(r) => {
272 self.on_reject(r, &counter_party).await?;
273 Ok(None)
274 }
275 }
276 }
277
278 pub async fn send_splice_offer(
280 &self,
281 contract_input: &ContractInput,
282 counter_party: PublicKey,
283 contract_id: &ContractId,
284 ) -> Result<OfferDlc, Error> {
285 let oracle_announcements = self.oracle_announcements(contract_input).await?;
286
287 self.send_splice_offer_with_announcements(
288 contract_input,
289 counter_party,
290 contract_id,
291 oracle_announcements,
292 )
293 .await
294 }
295
296 pub async fn send_splice_offer_with_announcements(
299 &self,
300 contract_input: &ContractInput,
301 counter_party: PublicKey,
302 contract_id: &ContractId,
303 oracle_announcements: Vec<Vec<OracleAnnouncement>>,
304 ) -> Result<OfferDlc, Error> {
305 let confirmed_contract =
306 get_contract_in_state!(self, contract_id, Confirmed, Some(counter_party))?;
307
308 let dlc_input = confirmed_contract.get_dlc_input();
309
310 let (offered_contract, offer_msg) = crate::contract_updater::offer_contract(
311 &self.secp,
312 contract_input,
313 oracle_announcements,
314 vec![dlc_input],
315 REFUND_DELAY,
316 &counter_party,
317 &self.wallet,
318 &self.blockchain,
319 &self.time,
320 &self.signer_provider,
321 )
322 .await?;
323
324 offered_contract.validate()?;
325
326 self.store.create_contract(&offered_contract).await?;
327
328 Ok(offer_msg)
329 }
330
331 pub async fn send_offer(
336 &self,
337 contract_input: &ContractInput,
338 counter_party: PublicKey,
339 ) -> Result<OfferDlc, Error> {
340 let oracle_announcements = self.oracle_announcements(contract_input).await?;
342
343 self.send_offer_with_announcements(contract_input, counter_party, oracle_announcements)
344 .await
345 }
346
347 pub async fn send_offer_with_announcements(
353 &self,
354 contract_input: &ContractInput,
355 counter_party: PublicKey,
356 oracle_announcements: Vec<Vec<OracleAnnouncement>>,
357 ) -> Result<OfferDlc, Error> {
358 let (offered_contract, offer_msg) = crate::contract_updater::offer_contract(
359 &self.secp,
360 contract_input,
361 oracle_announcements,
362 vec![],
363 REFUND_DELAY,
364 &counter_party,
365 &self.wallet,
366 &self.blockchain,
367 &self.time,
368 &self.signer_provider,
369 )
370 .await?;
371
372 offered_contract.validate()?;
373
374 self.store.create_contract(&offered_contract).await?;
375
376 Ok(offer_msg)
377 }
378
379 pub async fn accept_contract_offer(
381 &self,
382 contract_id: &ContractId,
383 ) -> Result<(ContractId, PublicKey, AcceptDlc), Error> {
384 let offered_contract =
385 get_contract_in_state!(self, contract_id, Offered, None as Option<PublicKey>)?;
386
387 let counter_party = offered_contract.counter_party;
388
389 let (accepted_contract, accept_msg) = accept_contract(
390 &self.secp,
391 &offered_contract,
392 &self.wallet,
393 &self.signer_provider,
394 &self.blockchain,
395 )
396 .await?;
397
398 self.wallet.import_address(&Address::p2wsh(
399 &accepted_contract.dlc_transactions.funding_script_pubkey,
400 self.blockchain.get_network()?,
401 ))?;
402
403 let contract_id = accepted_contract.get_contract_id();
404
405 self.store
406 .update_contract(&Contract::Accepted(accepted_contract))
407 .await?;
408
409 Ok((contract_id, counter_party, accept_msg))
410 }
411
412 pub async fn periodic_chain_monitor(&self) -> Result<(), Error> {
418 let cur_height = self.blockchain.get_blockchain_height().await?;
419 let last_height = self.chain_monitor.lock().await.last_height;
420
421 if cur_height < last_height {
424 return Err(Error::InvalidState(
425 "Current height is lower than last height.".to_string(),
426 ));
427 }
428
429 for height in last_height + 1..=cur_height {
430 let block = self.blockchain.get_block_at_height(height).await?;
431
432 self.chain_monitor
433 .lock()
434 .await
435 .process_block(&block, height);
436 }
437
438 Ok(())
439 }
440
441 pub async fn periodic_check(&self, check_channels: bool) -> Result<(), Error> {
444 tracing::debug!("Periodic check.");
445 let signed_contracts = self.check_for_spliced_contract().await?;
446 self.check_signed_contracts(&signed_contracts).await?;
447 self.check_confirmed_contracts().await?;
448 self.check_preclosed_contracts().await?;
449
450 if check_channels {
451 self.channel_checks().await?;
452 }
453
454 Ok(())
455 }
456
457 async fn on_offer_message(
458 &self,
459 offered_message: &OfferDlc,
460 counter_party: PublicKey,
461 ) -> Result<(), Error> {
462 offered_message.validate(&self.secp, REFUND_DELAY, REFUND_DELAY * 2)?;
463 let keys_id = self
464 .signer_provider
465 .derive_signer_key_id(false, offered_message.temporary_contract_id);
466 let contract: OfferedContract =
467 OfferedContract::try_from_offer_dlc(offered_message, counter_party, keys_id)?;
468 contract.validate()?;
469
470 if self.store.get_contract(&contract.id).await?.is_some() {
471 return Err(Error::InvalidParameters(
472 "Contract with identical id already exists".to_string(),
473 ));
474 }
475
476 self.store.create_contract(&contract).await?;
477
478 Ok(())
479 }
480
481 async fn on_close_message(
482 &self,
483 close_msg: &CloseDlc,
484 counter_party: &PublicKey,
485 ) -> Result<(), Error> {
486 let signed_contract = get_contract_in_state!(
488 self,
489 &close_msg.contract_id,
490 Confirmed,
491 Some(*counter_party)
492 )?;
493
494 let _close_tx = crate::contract_updater::complete_cooperative_close(
497 &self.secp,
498 &signed_contract,
499 close_msg,
500 &self.signer_provider,
501 )?;
502
503 Ok(())
506 }
507
508 async fn on_accept_message(
509 &self,
510 accept_msg: &AcceptDlc,
511 counter_party: &PublicKey,
512 ) -> Result<DlcMessage, Error> {
513 let offered_contract = get_contract_in_state!(
514 self,
515 &accept_msg.temporary_contract_id,
516 Offered,
517 Some(*counter_party)
518 )?;
519
520 let (signed_contract, signed_msg) = match verify_accepted_and_sign_contract(
521 &self.secp,
522 &offered_contract,
523 accept_msg,
524 &self.wallet,
525 &self.signer_provider,
526 &self.store,
527 )
528 .await
529 {
530 Ok(contract) => contract,
531 Err(e) => {
532 return self
533 .accept_fail_on_error(offered_contract, accept_msg.clone(), e)
534 .await
535 }
536 };
537
538 self.wallet.import_address(&Address::p2wsh(
539 &signed_contract
540 .accepted_contract
541 .dlc_transactions
542 .funding_script_pubkey,
543 self.blockchain.get_network()?,
544 ))?;
545
546 self.store
547 .update_contract(&Contract::Signed(signed_contract))
548 .await?;
549
550 Ok(DlcMessage::Sign(signed_msg))
551 }
552
553 async fn on_sign_message(
554 &self,
555 sign_message: &SignDlc,
556 peer_id: &PublicKey,
557 ) -> Result<(), Error> {
558 let accepted_contract =
559 get_contract_in_state!(self, &sign_message.contract_id, Accepted, Some(*peer_id))?;
560
561 let (signed_contract, fund_tx) = match crate::contract_updater::verify_signed_contract(
562 &self.secp,
563 &accepted_contract,
564 sign_message,
565 &self.wallet,
566 &self.store,
567 &self.signer_provider,
568 )
569 .await
570 {
571 Ok(contract) => contract,
572 Err(e) => {
573 return self
574 .sign_fail_on_error(accepted_contract, sign_message.clone(), e)
575 .await
576 }
577 };
578
579 self.store
580 .update_contract(&Contract::Signed(signed_contract.clone()))
581 .await?;
582
583 self.blockchain.send_transaction(&fund_tx).await?;
584
585 if accepted_contract
588 .offered_contract
589 .funding_inputs
590 .iter()
591 .any(|c| c.dlc_input.is_some())
592 {
593 let Some(dlc_input) = accepted_contract
594 .offered_contract
595 .funding_inputs
596 .iter()
597 .find(|c| c.dlc_input.is_some())
598 else {
599 return Ok(());
600 };
601
602 let preclosed_contract = get_contract_in_state!(
603 self,
604 &dlc_input.dlc_input.as_ref().unwrap().contract_id,
605 Confirmed,
606 None as Option<PublicKey>
607 )?;
608
609 let preclosed_contract = PreClosedContract {
610 signed_contract: preclosed_contract.clone(),
611 attestations: None,
612 signed_cet: signed_contract
613 .accepted_contract
614 .dlc_transactions
615 .fund
616 .clone(),
617 };
618
619 self.store
620 .update_contract(&Contract::PreClosed(preclosed_contract.clone()))
621 .await?;
622 }
623
624 Ok(())
625 }
626
627 async fn get_oracle_announcements(
628 &self,
629 oracle_inputs: &OracleInput,
630 ) -> Result<Vec<OracleAnnouncement>, Error> {
631 let mut announcements = Vec::new();
632 for pubkey in &oracle_inputs.public_keys {
633 let oracle = self
634 .oracles
635 .get(pubkey)
636 .ok_or_else(|| Error::InvalidParameters("Unknown oracle public key".to_string()))?;
637 let announcement = oracle.get_announcement(&oracle_inputs.event_id).await?;
638 announcements.push(announcement);
639 }
640
641 Ok(announcements)
642 }
643
644 async fn sign_fail_on_error<R>(
645 &self,
646 accepted_contract: AcceptedContract,
647 sign_message: SignDlc,
648 e: Error,
649 ) -> Result<R, Error> {
650 tracing::error!("Error in on_sign {}", e);
651 self.store
652 .update_contract(&Contract::FailedSign(FailedSignContract {
653 accepted_contract,
654 sign_message,
655 error_message: e.to_string(),
656 }))
657 .await?;
658 Err(e)
659 }
660
661 async fn accept_fail_on_error<R>(
662 &self,
663 offered_contract: OfferedContract,
664 accept_message: AcceptDlc,
665 e: Error,
666 ) -> Result<R, Error> {
667 tracing::error!("Error in on_accept {}", e);
668 self.store
669 .update_contract(&Contract::FailedAccept(FailedAcceptContract {
670 offered_contract,
671 accept_message,
672 error_message: e.to_string(),
673 }))
674 .await?;
675 Err(e)
676 }
677
678 async fn check_signed_contract(&self, contract: &SignedContract) -> Result<(), Error> {
679 let confirmations = self
680 .blockchain
681 .get_transaction_confirmations(
682 &contract
683 .accepted_contract
684 .dlc_transactions
685 .fund
686 .compute_txid(),
687 )
688 .await?;
689 if confirmations >= NB_CONFIRMATIONS {
690 tracing::info!(
691 confirmations,
692 contract_id = contract.accepted_contract.get_contract_id_string(),
693 "Marking contract as confirmed."
694 );
695 self.store
696 .update_contract(&Contract::Confirmed(contract.clone()))
697 .await?;
698 } else {
699 tracing::info!(
700 confirmations,
701 required = NB_CONFIRMATIONS,
702 contract_id = contract.accepted_contract.get_contract_id_string(),
703 "Not enough confirmations to mark contract as confirmed."
704 );
705 }
706 Ok(())
707 }
708
709 async fn check_signed_contracts(
710 &self,
711 signed_contracts: &[SignedContract],
712 ) -> Result<(), Error> {
713 for c in signed_contracts {
714 if let Err(e) = self.check_signed_contract(c).await {
715 tracing::error!(
716 "Error checking confirmed contract {}: {}",
717 c.accepted_contract.get_contract_id_string(),
718 e
719 )
720 }
721 }
722
723 Ok(())
724 }
725
726 async fn check_for_spliced_contract(&self) -> Result<Vec<SignedContract>, Error> {
727 let contracts = self.get_store().get_signed_contracts().await?;
728 for contract in &contracts {
729 let dlc_input = contract
730 .accepted_contract
731 .offered_contract
732 .funding_inputs
733 .iter()
734 .find(|d| d.dlc_input.is_some());
735
736 let Some(funding_input) = dlc_input else {
737 continue;
738 };
739
740 let contract_id = funding_input.dlc_input.as_ref().unwrap().contract_id;
741
742 let confirmed_contract_in_splice = match get_contract_in_state!(
743 self,
744 &contract_id,
745 Confirmed,
746 None as Option<PublicKey>
747 ) {
748 Ok(c) => Ok(c),
749 Err(Error::InvalidState(_)) => continue,
750 Err(e) => {
751 tracing::debug!("The previouse contract referenced in a splice transaction failed to retrieve. error={}", e.to_string());
752 Err(e)
753 }
754 }?;
755
756 let preclosed_contract = PreClosedContract {
757 signed_contract: confirmed_contract_in_splice.clone(),
758 attestations: None,
759 signed_cet: contract.accepted_contract.dlc_transactions.fund.clone(),
760 };
761
762 self.store
763 .update_contract(&Contract::PreClosed(preclosed_contract.clone()))
764 .await?;
765 }
766 Ok(contracts)
767 }
768
769 async fn check_confirmed_contracts(&self) -> Result<(), Error> {
770 for c in self.store.get_confirmed_contracts().await? {
771 if c.channel_id.is_some() {
773 continue;
774 }
775 if let Err(e) = self.check_confirmed_contract(&c).await {
776 tracing::error!(
777 "Error checking confirmed contract {}: {}",
778 c.accepted_contract.get_contract_id_string(),
779 e
780 )
781 }
782 }
783
784 Ok(())
785 }
786
787 async fn get_closable_contract_info<'a>(
788 &'a self,
789 contract: &'a SignedContract,
790 ) -> ClosableContractInfo<'a> {
791 let contract_infos = &contract.accepted_contract.offered_contract.contract_info;
792 let adaptor_infos = &contract.accepted_contract.adaptor_infos;
793 for (contract_info, adaptor_info) in contract_infos.iter().zip(adaptor_infos.iter()) {
794 let matured: Vec<_> = contract_info
795 .oracle_announcements
796 .iter()
797 .filter(|x| {
798 (x.oracle_event.event_maturity_epoch as u64) <= self.time.unix_time_now()
799 })
800 .enumerate()
801 .collect();
802 if matured.len() >= contract_info.threshold {
803 let attestations = stream::iter(matured.iter())
804 .map(|(i, announcement)| async move {
805 let oracle = match self.oracles.get(&announcement.oracle_public_key) {
807 Some(oracle) => oracle,
808 None => {
809 tracing::debug!(
810 "Oracle not found for key: {}",
811 announcement.oracle_public_key
812 );
813 return None;
814 }
815 };
816
817 let attestation = match oracle
819 .get_attestation(&announcement.oracle_event.event_id)
820 .await
821 {
822 Ok(attestation) => attestation,
823 Err(e) => {
824 tracing::error!(
825 "Attestation not found for event. id={} error={}",
826 announcement.oracle_event.event_id,
827 e.to_string()
828 );
829 return None;
830 }
831 };
832
833 if let Err(e) = attestation.validate(&self.secp, announcement) {
835 tracing::error!(
836 "Oracle attestation is not valid. pubkey={} event_id={}, error={:?}",
837 announcement.oracle_public_key,
838 announcement.oracle_event.event_id,
839 e
840 );
841 return None;
842 }
843
844 Some((*i, attestation))
845 })
846 .collect::<FuturesUnordered<_>>()
847 .await
848 .filter_map(|result| async move { result }) .collect::<Vec<_>>()
850 .await;
851 if attestations.len() >= contract_info.threshold {
852 return Some((contract_info, adaptor_info, attestations));
853 }
854 }
855 }
856 None
857 }
858
859 async fn check_confirmed_contract(&self, contract: &SignedContract) -> Result<(), Error> {
860 let closable_contract_info = self.get_closable_contract_info(contract).await;
861 if let Some((contract_info, adaptor_info, attestations)) = closable_contract_info {
862 let offer = &contract.accepted_contract.offered_contract;
863 let signer = self.signer_provider.derive_contract_signer(offer.keys_id)?;
864
865 if let Ok(cet) = crate::contract_updater::get_signed_cet(
871 &self.secp,
872 contract,
873 contract_info,
874 adaptor_info,
875 &attestations,
876 &signer,
877 ) {
878 match self
879 .close_contract(
880 contract,
881 cet,
882 attestations.iter().map(|x| x.1.clone()).collect(),
883 )
884 .await
885 {
886 Ok(closed_contract) => {
887 self.store.update_contract(&closed_contract).await?;
888 return Ok(());
889 }
890 Err(e) => {
891 tracing::warn!(
892 "Failed to close contract {}: {}",
893 contract.accepted_contract.get_contract_id_string(),
894 e
895 );
896 return Err(e);
897 }
898 }
899 }
900 }
901
902 for pending_close_tx in &contract
904 .accepted_contract
905 .dlc_transactions
906 .pending_close_txs
907 {
908 let confirmations = self
909 .blockchain
910 .get_transaction_confirmations(&pending_close_tx.compute_txid())
911 .await?;
912
913 if confirmations >= NB_CONFIRMATIONS {
914 let pnl = contract.accepted_contract.compute_pnl(pending_close_tx);
916 let closed_contract = ClosedContract {
917 attestations: None, signed_cet: Some(pending_close_tx.clone()), contract_id: contract.accepted_contract.get_contract_id(),
920 temporary_contract_id: contract.accepted_contract.offered_contract.id,
921 counter_party_id: contract.accepted_contract.offered_contract.counter_party,
922 pnl,
923 funding_txid: contract
924 .accepted_contract
925 .dlc_transactions
926 .fund
927 .compute_txid(),
928 };
929
930 self.store
931 .update_contract(&Contract::Closed(closed_contract))
932 .await?;
933 break; } else if confirmations >= 1 {
935 let preclosed_contract = PreClosedContract {
937 signed_contract: contract.clone(),
938 attestations: None, signed_cet: pending_close_tx.clone(),
940 };
941
942 self.store
943 .update_contract(&Contract::PreClosed(preclosed_contract))
944 .await?;
945 break; }
947 }
948
949 self.check_refund(contract).await?;
950
951 Ok(())
952 }
953
954 pub async fn close_confirmed_contract(
956 &self,
957 contract_id: &ContractId,
958 attestations: Vec<(usize, OracleAttestation)>,
959 ) -> Result<Contract, Error> {
960 let contract = get_contract_in_state!(self, contract_id, Confirmed, None::<PublicKey>)?;
961 let contract_infos = &contract.accepted_contract.offered_contract.contract_info;
962 let adaptor_infos = &contract.accepted_contract.adaptor_infos;
963
964 if let Some((contract_info, adaptor_info)) =
966 contract_infos.iter().zip(adaptor_infos).find(|(c, _)| {
967 let matches = attestations
968 .iter()
969 .filter(|(i, a)| {
970 c.oracle_announcements[*i].oracle_event.oracle_nonces == a.nonces()
971 })
972 .count();
973
974 matches >= c.threshold
975 })
976 {
977 let offer = &contract.accepted_contract.offered_contract;
978 let signer = self.signer_provider.derive_contract_signer(offer.keys_id)?;
979 let cet = crate::contract_updater::get_signed_cet(
980 &self.secp,
981 &contract,
982 contract_info,
983 adaptor_info,
984 &attestations,
985 &signer,
986 )?;
987
988 let time = bitcoin::absolute::Time::from_consensus(self.time.unix_time_now() as u32)
990 .expect("Time is not in valid range. This should never happen.");
991 let height =
992 Height::from_consensus(self.blockchain.get_blockchain_height().await? as u32)
993 .expect("Height is not in valid range. This should never happen.");
994 let locktime = cet.lock_time;
995
996 if !locktime.is_satisfied_by(height, time) {
997 return Err(Error::InvalidState(
998 "CET lock time has not passed yet".to_string(),
999 ));
1000 }
1001
1002 match self
1003 .close_contract(
1004 &contract,
1005 cet,
1006 attestations.into_iter().map(|x| x.1).collect(),
1007 )
1008 .await
1009 {
1010 Ok(closed_contract) => {
1011 self.store.update_contract(&closed_contract).await?;
1012 Ok(closed_contract)
1013 }
1014 Err(e) => {
1015 tracing::warn!(
1016 "Failed to close contract {}: {e}",
1017 contract.accepted_contract.get_contract_id_string()
1018 );
1019 Err(e)
1020 }
1021 }
1022 } else {
1023 Err(Error::InvalidState(
1024 "Attestations did not match contract infos".to_string(),
1025 ))
1026 }
1027 }
1028
1029 async fn check_preclosed_contracts(&self) -> Result<(), Error> {
1030 for c in self.store.get_preclosed_contracts().await? {
1031 if let Err(e) = self.check_preclosed_contract(&c).await {
1032 tracing::error!(
1033 "Error checking pre-closed contract {}: {}",
1034 c.signed_contract.accepted_contract.get_contract_id_string(),
1035 e
1036 )
1037 }
1038 }
1039
1040 Ok(())
1041 }
1042
1043 async fn check_preclosed_contract(&self, contract: &PreClosedContract) -> Result<(), Error> {
1044 let broadcasted_txid = contract.signed_cet.compute_txid();
1045 let confirmations = self
1046 .blockchain
1047 .get_transaction_confirmations(&broadcasted_txid)
1048 .await?;
1049 if confirmations >= NB_CONFIRMATIONS {
1050 let pnl = contract
1051 .signed_contract
1052 .accepted_contract
1053 .compute_pnl(&contract.signed_cet);
1054
1055 let closed_contract = ClosedContract {
1056 attestations: contract.attestations.clone(),
1057 signed_cet: Some(contract.signed_cet.clone()),
1058 contract_id: contract.signed_contract.accepted_contract.get_contract_id(),
1059 temporary_contract_id: contract
1060 .signed_contract
1061 .accepted_contract
1062 .offered_contract
1063 .id,
1064 counter_party_id: contract
1065 .signed_contract
1066 .accepted_contract
1067 .offered_contract
1068 .counter_party,
1069 funding_txid: contract
1070 .signed_contract
1071 .accepted_contract
1072 .dlc_transactions
1073 .fund
1074 .compute_txid(),
1075 pnl,
1076 };
1077 self.store
1078 .update_contract(&Contract::Closed(closed_contract))
1079 .await?;
1080 }
1081
1082 Ok(())
1083 }
1084
1085 async fn close_contract(
1086 &self,
1087 contract: &SignedContract,
1088 signed_cet: Transaction,
1089 attestations: Vec<OracleAttestation>,
1090 ) -> Result<Contract, Error> {
1091 let confirmations = self
1092 .blockchain
1093 .get_transaction_confirmations(&signed_cet.compute_txid())
1094 .await?;
1095
1096 if confirmations < 1 {
1097 tracing::info!(
1098 txid = signed_cet.compute_txid().to_string(),
1099 "Broadcasting signed CET."
1100 );
1101 self.blockchain.send_transaction(&signed_cet).await?;
1106
1107 let preclosed_contract = PreClosedContract {
1108 signed_contract: contract.clone(),
1109 attestations: Some(attestations),
1110 signed_cet,
1111 };
1112
1113 return Ok(Contract::PreClosed(preclosed_contract));
1114 } else if confirmations < NB_CONFIRMATIONS {
1115 let preclosed_contract = PreClosedContract {
1116 signed_contract: contract.clone(),
1117 attestations: Some(attestations),
1118 signed_cet,
1119 };
1120
1121 return Ok(Contract::PreClosed(preclosed_contract));
1122 }
1123
1124 let closed_contract = ClosedContract {
1125 attestations: Some(attestations.to_vec()),
1126 pnl: contract.accepted_contract.compute_pnl(&signed_cet),
1127 signed_cet: Some(signed_cet),
1128 contract_id: contract.accepted_contract.get_contract_id(),
1129 temporary_contract_id: contract.accepted_contract.offered_contract.id,
1130 funding_txid: contract
1131 .accepted_contract
1132 .dlc_transactions
1133 .fund
1134 .compute_txid(),
1135 counter_party_id: contract.accepted_contract.offered_contract.counter_party,
1136 };
1137
1138 Ok(Contract::Closed(closed_contract))
1139 }
1140
1141 async fn check_refund(&self, contract: &SignedContract) -> Result<(), Error> {
1143 if contract
1145 .accepted_contract
1146 .dlc_transactions
1147 .refund
1148 .lock_time
1149 .to_consensus_u32() as u64
1150 <= self.time.unix_time_now()
1151 {
1152 let accepted_contract = &contract.accepted_contract;
1153 let refund = accepted_contract.dlc_transactions.refund.clone();
1154 let confirmations = self
1155 .blockchain
1156 .get_transaction_confirmations(&refund.compute_txid())
1157 .await?;
1158 if confirmations == 0 {
1159 let offer = &contract.accepted_contract.offered_contract;
1160 let signer = self.signer_provider.derive_contract_signer(offer.keys_id)?;
1161 let refund =
1162 crate::contract_updater::get_signed_refund(&self.secp, contract, &signer)?;
1163 self.blockchain.send_transaction(&refund).await?;
1164 }
1165
1166 self.store
1167 .update_contract(&Contract::Refunded(contract.clone()))
1168 .await?;
1169 }
1170
1171 Ok(())
1172 }
1173
1174 pub async fn on_counterparty_close(
1177 &mut self,
1178 contract: &SignedContract,
1179 closing_tx: Transaction,
1180 confirmations: u32,
1181 ) -> Result<Contract, Error> {
1182 if !closing_tx.input.iter().any(|i| {
1184 i.previous_output
1185 == contract
1186 .accepted_contract
1187 .dlc_transactions
1188 .get_fund_outpoint()
1189 }) {
1190 return Err(Error::InvalidParameters(
1191 "Closing tx does not spend the funding tx".to_string(),
1192 ));
1193 }
1194
1195 if contract
1197 .accepted_contract
1198 .dlc_transactions
1199 .refund
1200 .compute_txid()
1201 == closing_tx.compute_txid()
1202 {
1203 let refunded = Contract::Refunded(contract.clone());
1204 self.store.update_contract(&refunded).await?;
1205 return Ok(refunded);
1206 }
1207
1208 let contract = if confirmations < NB_CONFIRMATIONS {
1209 Contract::PreClosed(PreClosedContract {
1210 signed_contract: contract.clone(),
1211 attestations: None, signed_cet: closing_tx,
1213 })
1214 } else {
1215 Contract::Closed(ClosedContract {
1216 attestations: None, pnl: contract.accepted_contract.compute_pnl(&closing_tx),
1218 signed_cet: Some(closing_tx),
1219 contract_id: contract.accepted_contract.get_contract_id(),
1220 temporary_contract_id: contract.accepted_contract.offered_contract.id,
1221 counter_party_id: contract.accepted_contract.offered_contract.counter_party,
1222 funding_txid: contract
1223 .accepted_contract
1224 .dlc_transactions
1225 .fund
1226 .compute_txid(),
1227 })
1228 };
1229
1230 self.store.update_contract(&contract).await?;
1231
1232 Ok(contract)
1233 }
1234
1235 pub async fn cooperative_close_contract(
1239 &self,
1240 contract_id: &ContractId,
1241 counter_payout: Amount,
1242 ) -> Result<(CloseDlc, PublicKey), Error> {
1243 let signed_contract =
1244 get_contract_in_state!(self, contract_id, Confirmed, None as Option<PublicKey>)?;
1245
1246 let (close_message, close_tx) = crate::contract_updater::create_cooperative_close(
1247 &self.secp,
1248 &signed_contract,
1249 counter_payout,
1250 &self.signer_provider,
1251 )?;
1252
1253 let mut updated_dlc_transactions =
1255 signed_contract.accepted_contract.dlc_transactions.clone();
1256 updated_dlc_transactions
1257 .pending_close_txs
1258 .push(close_tx.clone());
1259
1260 let updated_accepted_contract = AcceptedContract {
1261 dlc_transactions: updated_dlc_transactions,
1262 ..signed_contract.accepted_contract.clone()
1263 };
1264
1265 let updated_signed_contract = SignedContract {
1266 accepted_contract: updated_accepted_contract,
1267 ..signed_contract.clone()
1268 };
1269
1270 self.store
1272 .update_contract(&Contract::Confirmed(updated_signed_contract))
1273 .await?;
1274
1275 let counter_party = signed_contract
1276 .accepted_contract
1277 .offered_contract
1278 .counter_party;
1279
1280 Ok((close_message, counter_party))
1281 }
1282
1283 pub async fn accept_cooperative_close(
1286 &self,
1287 contract_id: &ContractId,
1288 close_message: &CloseDlc,
1289 ) -> Result<(), Error> {
1290 let signed_contract =
1291 get_contract_in_state!(self, contract_id, Confirmed, None as Option<PublicKey>)?;
1292
1293 let close_tx = crate::contract_updater::complete_cooperative_close(
1294 &self.secp,
1295 &signed_contract,
1296 close_message,
1297 &self.signer_provider,
1298 )?;
1299
1300 self.blockchain.send_transaction(&close_tx).await?;
1302
1303 let preclosed_contract = PreClosedContract {
1305 signed_contract,
1306 attestations: None,
1307 signed_cet: close_tx,
1308 };
1309
1310 self.store
1311 .update_contract(&Contract::PreClosed(preclosed_contract))
1312 .await?;
1313
1314 Ok(())
1315 }
1316}
1317
1318impl<W: Deref, SP: Deref, B: Deref, S: Deref, O: Deref, T: Deref, F: Deref, X: ContractSigner>
1319 Manager<W, Arc<CachedContractSignerProvider<SP, X>>, B, S, O, T, F, X>
1320where
1321 W::Target: Wallet,
1322 SP::Target: ContractSignerProvider<Signer = X>,
1323 B::Target: Blockchain,
1324 S::Target: Storage,
1325 O::Target: Oracle,
1326 T::Target: Time,
1327 F::Target: FeeEstimator,
1328{
1329 pub async fn offer_channel(
1332 &self,
1333 contract_input: &ContractInput,
1334 counter_party: PublicKey,
1335 ) -> Result<OfferChannel, Error> {
1336 let oracle_announcements = self.oracle_announcements(contract_input).await?;
1337
1338 let (offered_channel, offered_contract) = crate::channel_updater::offer_channel(
1339 &self.secp,
1340 contract_input,
1341 &counter_party,
1342 &oracle_announcements,
1343 CET_NSEQUENCE,
1344 REFUND_DELAY,
1345 &self.wallet,
1346 &self.signer_provider,
1347 &self.blockchain,
1348 &self.time,
1349 crate::utils::get_new_temporary_id(),
1350 )
1351 .await?;
1352
1353 let msg = offered_channel.get_offer_channel_msg(&offered_contract);
1354
1355 self.store
1356 .upsert_channel(
1357 Channel::Offered(offered_channel),
1358 Some(Contract::Offered(offered_contract)),
1359 )
1360 .await?;
1361
1362 Ok(msg)
1363 }
1364
1365 pub async fn reject_channel(
1368 &self,
1369 channel_id: &ChannelId,
1370 ) -> Result<(Reject, PublicKey), Error> {
1371 let offered_channel =
1372 get_channel_in_state!(self, channel_id, Offered, None as Option<PublicKey>)?;
1373
1374 if offered_channel.is_offer_party {
1375 return Err(Error::InvalidState(
1376 "Cannot reject channel initiated by us.".to_string(),
1377 ));
1378 }
1379
1380 let offered_contract = get_contract_in_state!(
1381 self,
1382 &offered_channel.offered_contract_id,
1383 Offered,
1384 None as Option<PublicKey>
1385 )?;
1386
1387 let counterparty = offered_channel.counter_party;
1388 self.store
1389 .upsert_channel(
1390 Channel::Cancelled(offered_channel),
1391 Some(Contract::Rejected(offered_contract)),
1392 )
1393 .await?;
1394
1395 let msg = Reject {
1396 channel_id: *channel_id,
1397 };
1398 Ok((msg, counterparty))
1399 }
1400
1401 pub async fn accept_channel(
1405 &self,
1406 channel_id: &ChannelId,
1407 ) -> Result<(AcceptChannel, ChannelId, ContractId, PublicKey), Error> {
1408 let offered_channel =
1409 get_channel_in_state!(self, channel_id, Offered, None as Option<PublicKey>)?;
1410
1411 if offered_channel.is_offer_party {
1412 return Err(Error::InvalidState(
1413 "Cannot accept channel initiated by us.".to_string(),
1414 ));
1415 }
1416
1417 let offered_contract = get_contract_in_state!(
1418 self,
1419 &offered_channel.offered_contract_id,
1420 Offered,
1421 None as Option<PublicKey>
1422 )?;
1423
1424 let (accepted_channel, accepted_contract, accept_channel) =
1425 crate::channel_updater::accept_channel_offer(
1426 &self.secp,
1427 &offered_channel,
1428 &offered_contract,
1429 &self.wallet,
1430 &self.signer_provider,
1431 &self.blockchain,
1432 )
1433 .await?;
1434
1435 self.wallet.import_address(&Address::p2wsh(
1436 &accepted_contract.dlc_transactions.funding_script_pubkey,
1437 self.blockchain.get_network()?,
1438 ))?;
1439
1440 let channel_id = accepted_channel.channel_id;
1441 let contract_id = accepted_contract.get_contract_id();
1442 let counter_party = accepted_contract.offered_contract.counter_party;
1443
1444 self.store
1445 .upsert_channel(
1446 Channel::Accepted(accepted_channel),
1447 Some(Contract::Accepted(accepted_contract)),
1448 )
1449 .await?;
1450
1451 Ok((accept_channel, channel_id, contract_id, counter_party))
1452 }
1453
1454 pub async fn force_close_channel(&self, channel_id: &ChannelId) -> Result<(), Error> {
1456 let channel = get_channel_in_state!(self, channel_id, Signed, None as Option<PublicKey>)?;
1457
1458 self.force_close_channel_internal(channel, true).await
1459 }
1460
1461 pub async fn settle_offer(
1465 &self,
1466 channel_id: &ChannelId,
1467 counter_payout: Amount,
1468 ) -> Result<(SettleOffer, PublicKey), Error> {
1469 let mut signed_channel =
1470 get_channel_in_state!(self, channel_id, Signed, None as Option<PublicKey>)?;
1471
1472 let msg = crate::channel_updater::settle_channel_offer(
1473 &self.secp,
1474 &mut signed_channel,
1475 counter_payout,
1476 PEER_TIMEOUT,
1477 &self.signer_provider,
1478 &self.time,
1479 )?;
1480
1481 let counter_party = signed_channel.counter_party;
1482
1483 self.store
1484 .upsert_channel(Channel::Signed(signed_channel), None)
1485 .await?;
1486
1487 Ok((msg, counter_party))
1488 }
1489
1490 pub async fn accept_settle_offer(
1493 &self,
1494 channel_id: &ChannelId,
1495 ) -> Result<(SettleAccept, PublicKey), Error> {
1496 let mut signed_channel =
1497 get_channel_in_state!(self, channel_id, Signed, None as Option<PublicKey>)?;
1498
1499 let msg = crate::channel_updater::settle_channel_accept(
1500 &self.secp,
1501 &mut signed_channel,
1502 CET_NSEQUENCE,
1503 0,
1504 PEER_TIMEOUT,
1505 &self.signer_provider,
1506 &self.time,
1507 &self.chain_monitor,
1508 )
1509 .await?;
1510
1511 let counter_party = signed_channel.counter_party;
1512
1513 self.store
1514 .upsert_channel(Channel::Signed(signed_channel), None)
1515 .await?;
1516
1517 Ok((msg, counter_party))
1518 }
1519
1520 pub async fn renew_offer(
1524 &self,
1525 channel_id: &ChannelId,
1526 counter_payout: Amount,
1527 contract_input: &ContractInput,
1528 ) -> Result<(RenewOffer, PublicKey), Error> {
1529 let mut signed_channel =
1530 get_channel_in_state!(self, channel_id, Signed, None as Option<PublicKey>)?;
1531
1532 let oracle_announcements = self.oracle_announcements(contract_input).await?;
1533
1534 let (msg, offered_contract) = crate::channel_updater::renew_offer(
1535 &self.secp,
1536 &mut signed_channel,
1537 contract_input,
1538 oracle_announcements,
1539 counter_payout,
1540 REFUND_DELAY,
1541 PEER_TIMEOUT,
1542 CET_NSEQUENCE,
1543 &self.signer_provider,
1544 &self.time,
1545 )?;
1546
1547 let counter_party = offered_contract.counter_party;
1548
1549 self.store
1550 .upsert_channel(
1551 Channel::Signed(signed_channel),
1552 Some(Contract::Offered(offered_contract)),
1553 )
1554 .await?;
1555
1556 Ok((msg, counter_party))
1557 }
1558
1559 pub async fn accept_renew_offer(
1563 &self,
1564 channel_id: &ChannelId,
1565 ) -> Result<(RenewAccept, PublicKey), Error> {
1566 let mut signed_channel =
1567 get_channel_in_state!(self, channel_id, Signed, None as Option<PublicKey>)?;
1568 let offered_contract_id = signed_channel.get_contract_id().ok_or_else(|| {
1569 Error::InvalidState("Expected to have a contract id but did not.".to_string())
1570 })?;
1571
1572 let offered_contract = get_contract_in_state!(
1573 self,
1574 &offered_contract_id,
1575 Offered,
1576 None as Option<PublicKey>
1577 )?;
1578
1579 let (accepted_contract, msg) = crate::channel_updater::accept_channel_renewal(
1580 &self.secp,
1581 &mut signed_channel,
1582 &offered_contract,
1583 CET_NSEQUENCE,
1584 PEER_TIMEOUT,
1585 &self.signer_provider,
1586 &self.time,
1587 )?;
1588
1589 let counter_party = signed_channel.counter_party;
1590
1591 self.store
1592 .upsert_channel(
1593 Channel::Signed(signed_channel),
1594 Some(Contract::Accepted(accepted_contract)),
1595 )
1596 .await?;
1597
1598 Ok((msg, counter_party))
1599 }
1600
1601 pub async fn reject_renew_offer(
1605 &self,
1606 channel_id: &ChannelId,
1607 ) -> Result<(Reject, PublicKey), Error> {
1608 let mut signed_channel =
1609 get_channel_in_state!(self, channel_id, Signed, None as Option<PublicKey>)?;
1610 let offered_contract_id = signed_channel.get_contract_id().ok_or_else(|| {
1611 Error::InvalidState(
1612 "Expected to be in a state with an associated contract id but was not.".to_string(),
1613 )
1614 })?;
1615
1616 let offered_contract = get_contract_in_state!(
1617 self,
1618 &offered_contract_id,
1619 Offered,
1620 None as Option<PublicKey>
1621 )?;
1622
1623 let reject_msg = crate::channel_updater::reject_renew_offer(&mut signed_channel)?;
1624
1625 let counter_party = signed_channel.counter_party;
1626
1627 self.store
1628 .upsert_channel(
1629 Channel::Signed(signed_channel),
1630 Some(Contract::Rejected(offered_contract)),
1631 )
1632 .await?;
1633
1634 Ok((reject_msg, counter_party))
1635 }
1636
1637 pub async fn reject_settle_offer(
1641 &self,
1642 channel_id: &ChannelId,
1643 ) -> Result<(Reject, PublicKey), Error> {
1644 let mut signed_channel =
1645 get_channel_in_state!(self, channel_id, Signed, None as Option<PublicKey>)?;
1646
1647 let msg = crate::channel_updater::reject_settle_offer(&mut signed_channel)?;
1648
1649 let counter_party = signed_channel.counter_party;
1650
1651 self.store
1652 .upsert_channel(Channel::Signed(signed_channel), None)
1653 .await?;
1654
1655 Ok((msg, counter_party))
1656 }
1657
1658 pub async fn offer_collaborative_close(
1663 &self,
1664 channel_id: &ChannelId,
1665 counter_payout: Amount,
1666 additional_inputs: Vec<OutPoint>,
1667 ) -> Result<CollaborativeCloseOffer, Error> {
1668 let mut signed_channel =
1669 get_channel_in_state!(self, channel_id, Signed, None as Option<PublicKey>)?;
1670
1671 let (msg, close_tx) = crate::channel_updater::offer_collaborative_close(
1672 &self.secp,
1673 &mut signed_channel,
1674 counter_payout,
1675 additional_inputs,
1676 &self.signer_provider,
1677 &self.time,
1678 )?;
1679
1680 self.chain_monitor.lock().await.add_tx(
1681 close_tx.compute_txid(),
1682 ChannelInfo {
1683 channel_id: *channel_id,
1684 tx_type: TxType::CollaborativeClose,
1685 },
1686 );
1687
1688 self.store
1689 .upsert_channel(Channel::Signed(signed_channel), None)
1690 .await?;
1691 self.store
1692 .persist_chain_monitor(&*self.chain_monitor.lock().await)
1693 .await?;
1694
1695 Ok(msg)
1696 }
1697
1698 pub async fn accept_collaborative_close(&self, channel_id: &ChannelId) -> Result<(), Error> {
1701 let signed_channel =
1702 get_channel_in_state!(self, channel_id, Signed, None as Option<PublicKey>)?;
1703
1704 let closed_contract = if let Some(SignedChannelState::Established {
1705 signed_contract_id,
1706 ..
1707 }) = &signed_channel.roll_back_state
1708 {
1709 let counter_payout = get_signed_channel_state!(
1710 signed_channel,
1711 CollaborativeCloseOffered,
1712 counter_payout
1713 )?;
1714 Some(
1715 self.get_collaboratively_closed_contract(signed_contract_id, *counter_payout, true)
1716 .await?,
1717 )
1718 } else {
1719 None
1720 };
1721
1722 let (close_tx, closed_channel) = crate::channel_updater::accept_collaborative_close_offer(
1723 &self.secp,
1724 &signed_channel,
1725 &self.signer_provider,
1726 )?;
1727
1728 self.blockchain.send_transaction(&close_tx).await?;
1729
1730 self.store.upsert_channel(closed_channel, None).await?;
1731
1732 if let Some(closed_contract) = closed_contract {
1733 self.store
1734 .update_contract(&Contract::Closed(closed_contract))
1735 .await?;
1736 }
1737
1738 Ok(())
1739 }
1740
1741 async fn try_finalize_closing_established_channel(
1742 &self,
1743 signed_channel: SignedChannel,
1744 ) -> Result<(), Error> {
1745 let (buffer_tx, contract_id, &is_initiator) = get_signed_channel_state!(
1746 signed_channel,
1747 Closing,
1748 buffer_transaction,
1749 contract_id,
1750 is_initiator
1751 )?;
1752
1753 if self
1754 .blockchain
1755 .get_transaction_confirmations(&buffer_tx.compute_txid())
1756 .await?
1757 >= CET_NSEQUENCE
1758 {
1759 tracing::info!(
1760 "Buffer transaction for contract {} has enough confirmations to spend from it",
1761 serialize_hex(&contract_id)
1762 );
1763
1764 let confirmed_contract =
1765 get_contract_in_state!(self, &contract_id, Confirmed, None as Option<PublicKey>)?;
1766
1767 let (contract_info, adaptor_info, attestations) = self
1768 .get_closable_contract_info(&confirmed_contract)
1769 .await
1770 .ok_or_else(|| {
1771 Error::InvalidState("Could not get information to close contract".to_string())
1772 })?;
1773
1774 let (signed_cet, closed_channel) =
1775 crate::channel_updater::finalize_unilateral_close_settled_channel(
1776 &self.secp,
1777 &signed_channel,
1778 &confirmed_contract,
1779 contract_info,
1780 &attestations,
1781 adaptor_info,
1782 &self.signer_provider,
1783 is_initiator,
1784 )?;
1785
1786 let closed_contract = self
1787 .close_contract(
1788 &confirmed_contract,
1789 signed_cet,
1790 attestations.iter().map(|x| &x.1).cloned().collect(),
1791 )
1792 .await?;
1793
1794 self.chain_monitor
1795 .lock()
1796 .await
1797 .cleanup_channel(signed_channel.channel_id);
1798
1799 self.store
1800 .upsert_channel(closed_channel, Some(closed_contract))
1801 .await?;
1802 }
1803
1804 Ok(())
1805 }
1806
1807 async fn on_offer_channel(
1808 &self,
1809 offer_channel: &OfferChannel,
1810 counter_party: PublicKey,
1811 ) -> Result<(), Error> {
1812 offer_channel.validate(
1813 &self.secp,
1814 REFUND_DELAY,
1815 REFUND_DELAY * 2,
1816 CET_NSEQUENCE,
1817 CET_NSEQUENCE * 2,
1818 )?;
1819
1820 let keys_id = self
1821 .signer_provider
1822 .derive_signer_key_id(false, offer_channel.temporary_contract_id);
1823 let (channel, contract) =
1824 OfferedChannel::from_offer_channel(offer_channel, counter_party, keys_id)?;
1825
1826 contract.validate()?;
1827
1828 if self
1829 .store
1830 .get_channel(&channel.temporary_channel_id)
1831 .await?
1832 .is_some()
1833 {
1834 return Err(Error::InvalidParameters(
1835 "Channel with identical idea already in store".to_string(),
1836 ));
1837 }
1838
1839 self.store
1840 .upsert_channel(Channel::Offered(channel), Some(Contract::Offered(contract)))
1841 .await?;
1842
1843 Ok(())
1844 }
1845
1846 async fn on_accept_channel(
1847 &self,
1848 accept_channel: &AcceptChannel,
1849 peer_id: &PublicKey,
1850 ) -> Result<SignChannel, Error> {
1851 let offered_channel = get_channel_in_state!(
1852 self,
1853 &accept_channel.temporary_channel_id,
1854 Offered,
1855 Some(*peer_id)
1856 )?;
1857 let offered_contract = get_contract_in_state!(
1858 self,
1859 &offered_channel.offered_contract_id,
1860 Offered,
1861 Some(*peer_id)
1862 )?;
1863
1864 let (signed_channel, signed_contract, sign_channel) = {
1865 let res = crate::channel_updater::verify_and_sign_accepted_channel(
1866 &self.secp,
1867 &offered_channel,
1868 &offered_contract,
1869 accept_channel,
1870 CET_NSEQUENCE,
1872 &self.wallet,
1873 &self.signer_provider,
1874 &self.store,
1875 &self.chain_monitor,
1876 )
1877 .await;
1878
1879 match res {
1880 Ok(res) => res,
1881 Err(e) => {
1882 let channel = crate::channel::FailedAccept {
1883 temporary_channel_id: accept_channel.temporary_channel_id,
1884 error_message: format!("Error validating accept channel: {e}"),
1885 accept_message: accept_channel.clone(),
1886 counter_party: *peer_id,
1887 };
1888 self.store
1889 .upsert_channel(Channel::FailedAccept(channel), None)
1890 .await?;
1891 return Err(e);
1892 }
1893 }
1894 };
1895
1896 self.wallet.import_address(&Address::p2wsh(
1897 &signed_contract
1898 .accepted_contract
1899 .dlc_transactions
1900 .funding_script_pubkey,
1901 self.blockchain.get_network()?,
1902 ))?;
1903
1904 if let SignedChannelState::Established {
1905 buffer_transaction, ..
1906 } = &signed_channel.state
1907 {
1908 self.chain_monitor.lock().await.add_tx(
1909 buffer_transaction.compute_txid(),
1910 ChannelInfo {
1911 channel_id: signed_channel.channel_id,
1912 tx_type: TxType::BufferTx,
1913 },
1914 );
1915 } else {
1916 unreachable!();
1917 }
1918
1919 self.store
1920 .upsert_channel(
1921 Channel::Signed(signed_channel),
1922 Some(Contract::Signed(signed_contract)),
1923 )
1924 .await?;
1925
1926 self.store
1927 .persist_chain_monitor(&*self.chain_monitor.lock().await)
1928 .await?;
1929
1930 Ok(sign_channel)
1931 }
1932
1933 async fn on_sign_channel(
1934 &self,
1935 sign_channel: &SignChannel,
1936 peer_id: &PublicKey,
1937 ) -> Result<(), Error> {
1938 let accepted_channel =
1939 get_channel_in_state!(self, &sign_channel.channel_id, Accepted, Some(*peer_id))?;
1940 let accepted_contract = get_contract_in_state!(
1941 self,
1942 &accepted_channel.accepted_contract_id,
1943 Accepted,
1944 Some(*peer_id)
1945 )?;
1946
1947 let (signed_channel, signed_contract, signed_fund_tx) = {
1948 let res = verify_signed_channel(
1949 &self.secp,
1950 &accepted_channel,
1951 &accepted_contract,
1952 sign_channel,
1953 &self.wallet,
1954 &self.chain_monitor,
1955 &self.store,
1956 &self.signer_provider,
1957 )
1958 .await;
1959
1960 match res {
1961 Ok(res) => res,
1962 Err(e) => {
1963 let channel = crate::channel::FailedSign {
1964 channel_id: sign_channel.channel_id,
1965 error_message: format!("Error validating accept channel: {e}"),
1966 sign_message: sign_channel.clone(),
1967 counter_party: *peer_id,
1968 };
1969 self.store
1970 .upsert_channel(Channel::FailedSign(channel), None)
1971 .await?;
1972 return Err(e);
1973 }
1974 }
1975 };
1976
1977 if let SignedChannelState::Established {
1978 buffer_transaction, ..
1979 } = &signed_channel.state
1980 {
1981 self.chain_monitor.lock().await.add_tx(
1982 buffer_transaction.compute_txid(),
1983 ChannelInfo {
1984 channel_id: signed_channel.channel_id,
1985 tx_type: TxType::BufferTx,
1986 },
1987 );
1988 } else {
1989 unreachable!();
1990 }
1991
1992 self.blockchain.send_transaction(&signed_fund_tx).await?;
1993
1994 self.store
1995 .upsert_channel(
1996 Channel::Signed(signed_channel),
1997 Some(Contract::Signed(signed_contract)),
1998 )
1999 .await?;
2000 self.store
2001 .persist_chain_monitor(&*self.chain_monitor.lock().await)
2002 .await?;
2003
2004 Ok(())
2005 }
2006
2007 async fn on_settle_offer(
2008 &self,
2009 settle_offer: &SettleOffer,
2010 peer_id: &PublicKey,
2011 ) -> Result<Option<Reject>, Error> {
2012 let mut signed_channel =
2013 get_channel_in_state!(self, &settle_offer.channel_id, Signed, Some(*peer_id))?;
2014
2015 if let SignedChannelState::SettledOffered { .. } = signed_channel.state {
2016 return Ok(Some(Reject {
2017 channel_id: settle_offer.channel_id,
2018 }));
2019 }
2020
2021 crate::channel_updater::on_settle_offer(&mut signed_channel, settle_offer)?;
2022
2023 self.store
2024 .upsert_channel(Channel::Signed(signed_channel), None)
2025 .await?;
2026
2027 Ok(None)
2028 }
2029
2030 async fn on_settle_accept(
2031 &self,
2032 settle_accept: &SettleAccept,
2033 peer_id: &PublicKey,
2034 ) -> Result<SettleConfirm, Error> {
2035 let mut signed_channel =
2036 get_channel_in_state!(self, &settle_accept.channel_id, Signed, Some(*peer_id))?;
2037
2038 let msg = crate::channel_updater::settle_channel_confirm(
2039 &self.secp,
2040 &mut signed_channel,
2041 settle_accept,
2042 CET_NSEQUENCE,
2043 0,
2044 PEER_TIMEOUT,
2045 &self.signer_provider,
2046 &self.time,
2047 &self.chain_monitor,
2048 )
2049 .await?;
2050
2051 self.store
2052 .upsert_channel(Channel::Signed(signed_channel), None)
2053 .await?;
2054
2055 Ok(msg)
2056 }
2057
2058 async fn on_settle_confirm(
2059 &self,
2060 settle_confirm: &SettleConfirm,
2061 peer_id: &PublicKey,
2062 ) -> Result<SettleFinalize, Error> {
2063 let mut signed_channel =
2064 get_channel_in_state!(self, &settle_confirm.channel_id, Signed, Some(*peer_id))?;
2065 let &own_payout = get_signed_channel_state!(signed_channel, SettledAccepted, own_payout)?;
2066 let (prev_buffer_tx, own_buffer_adaptor_signature, is_offer, signed_contract_id) = get_signed_channel_rollback_state!(
2067 signed_channel,
2068 Established,
2069 buffer_transaction,
2070 own_buffer_adaptor_signature,
2071 is_offer,
2072 signed_contract_id
2073 )?;
2074
2075 let prev_buffer_txid = prev_buffer_tx.compute_txid();
2076 let own_buffer_adaptor_signature = *own_buffer_adaptor_signature;
2077 let is_offer = *is_offer;
2078 let signed_contract_id = *signed_contract_id;
2079
2080 let msg = crate::channel_updater::settle_channel_finalize(
2081 &self.secp,
2082 &mut signed_channel,
2083 settle_confirm,
2084 &self.signer_provider,
2085 )?;
2086
2087 self.chain_monitor.lock().await.add_tx(
2088 prev_buffer_txid,
2089 ChannelInfo {
2090 channel_id: signed_channel.channel_id,
2091 tx_type: TxType::Revoked {
2092 update_idx: signed_channel.update_idx + 1,
2093 own_adaptor_signature: own_buffer_adaptor_signature,
2094 is_offer,
2095 revoked_tx_type: RevokedTxType::Buffer,
2096 },
2097 },
2098 );
2099
2100 let closed_contract = Contract::Closed(
2101 self.get_collaboratively_closed_contract(&signed_contract_id, own_payout, true)
2102 .await?,
2103 );
2104
2105 self.store
2106 .upsert_channel(Channel::Signed(signed_channel), Some(closed_contract))
2107 .await?;
2108 self.store
2109 .persist_chain_monitor(&*self.chain_monitor.lock().await)
2110 .await?;
2111
2112 Ok(msg)
2113 }
2114
2115 async fn on_settle_finalize(
2116 &self,
2117 settle_finalize: &SettleFinalize,
2118 peer_id: &PublicKey,
2119 ) -> Result<(), Error> {
2120 let mut signed_channel =
2121 get_channel_in_state!(self, &settle_finalize.channel_id, Signed, Some(*peer_id))?;
2122 let &own_payout = get_signed_channel_state!(signed_channel, SettledConfirmed, own_payout)?;
2123 let (buffer_tx, own_buffer_adaptor_signature, is_offer, signed_contract_id) = get_signed_channel_rollback_state!(
2124 signed_channel,
2125 Established,
2126 buffer_transaction,
2127 own_buffer_adaptor_signature,
2128 is_offer,
2129 signed_contract_id
2130 )?;
2131
2132 let own_buffer_adaptor_signature = *own_buffer_adaptor_signature;
2133 let is_offer = *is_offer;
2134 let buffer_txid = buffer_tx.compute_txid();
2135 let signed_contract_id = *signed_contract_id;
2136
2137 crate::channel_updater::settle_channel_on_finalize(
2138 &self.secp,
2139 &mut signed_channel,
2140 settle_finalize,
2141 )?;
2142
2143 self.chain_monitor.lock().await.add_tx(
2144 buffer_txid,
2145 ChannelInfo {
2146 channel_id: signed_channel.channel_id,
2147 tx_type: TxType::Revoked {
2148 update_idx: signed_channel.update_idx + 1,
2149 own_adaptor_signature: own_buffer_adaptor_signature,
2150 is_offer,
2151 revoked_tx_type: RevokedTxType::Buffer,
2152 },
2153 },
2154 );
2155
2156 let closed_contract = Contract::Closed(
2157 self.get_collaboratively_closed_contract(&signed_contract_id, own_payout, true)
2158 .await?,
2159 );
2160 self.store
2161 .upsert_channel(Channel::Signed(signed_channel), Some(closed_contract))
2162 .await?;
2163 self.store
2164 .persist_chain_monitor(&*self.chain_monitor.lock().await)
2165 .await?;
2166
2167 Ok(())
2168 }
2169
2170 async fn on_renew_offer(
2171 &self,
2172 renew_offer: &RenewOffer,
2173 peer_id: &PublicKey,
2174 ) -> Result<Option<Reject>, Error> {
2175 let mut signed_channel =
2176 get_channel_in_state!(self, &renew_offer.channel_id, Signed, Some(*peer_id))?;
2177
2178 if let SignedChannelState::RenewOffered { is_offer, .. } = signed_channel.state {
2180 if is_offer {
2181 return Ok(Some(Reject {
2182 channel_id: renew_offer.channel_id,
2183 }));
2184 }
2185 }
2186
2187 let offered_contract = crate::channel_updater::on_renew_offer(
2188 &mut signed_channel,
2189 renew_offer,
2190 PEER_TIMEOUT,
2191 &self.time,
2192 )?;
2193
2194 self.store.create_contract(&offered_contract).await?;
2195 self.store
2196 .upsert_channel(Channel::Signed(signed_channel), None)
2197 .await?;
2198
2199 Ok(None)
2200 }
2201
2202 async fn on_renew_accept(
2203 &self,
2204 renew_accept: &RenewAccept,
2205 peer_id: &PublicKey,
2206 ) -> Result<RenewConfirm, Error> {
2207 let mut signed_channel =
2208 get_channel_in_state!(self, &renew_accept.channel_id, Signed, Some(*peer_id))?;
2209 let offered_contract_id = signed_channel.get_contract_id().ok_or_else(|| {
2210 Error::InvalidState(
2211 "Expected to be in a state with an associated contract id but was not.".to_string(),
2212 )
2213 })?;
2214
2215 let offered_contract =
2216 get_contract_in_state!(self, &offered_contract_id, Offered, Some(*peer_id))?;
2217
2218 let (signed_contract, msg) = crate::channel_updater::verify_renew_accept_and_confirm(
2219 &self.secp,
2220 renew_accept,
2221 &mut signed_channel,
2222 &offered_contract,
2223 CET_NSEQUENCE,
2224 PEER_TIMEOUT,
2225 &self.wallet,
2226 &self.signer_provider,
2227 &self.time,
2228 &self.store,
2229 )
2230 .await?;
2231
2232 self.store
2234 .upsert_channel(
2235 Channel::Signed(signed_channel),
2236 Some(Contract::Confirmed(signed_contract)),
2237 )
2238 .await?;
2239
2240 Ok(msg)
2241 }
2242
2243 async fn on_renew_confirm(
2244 &self,
2245 renew_confirm: &RenewConfirm,
2246 peer_id: &PublicKey,
2247 ) -> Result<RenewFinalize, Error> {
2248 let mut signed_channel =
2249 get_channel_in_state!(self, &renew_confirm.channel_id, Signed, Some(*peer_id))?;
2250 let own_payout = get_signed_channel_state!(signed_channel, RenewAccepted, own_payout)?;
2251 let contract_id = signed_channel.get_contract_id().ok_or_else(|| {
2252 Error::InvalidState(
2253 "Expected to be in a state with an associated contract id but was not.".to_string(),
2254 )
2255 })?;
2256
2257 let (tx_type, prev_tx_id, closed_contract) = match signed_channel
2258 .roll_back_state
2259 .as_ref()
2260 .expect("to have a rollback state")
2261 {
2262 SignedChannelState::Established {
2263 own_buffer_adaptor_signature,
2264 buffer_transaction,
2265 signed_contract_id,
2266 ..
2267 } => {
2268 let closed_contract = Contract::Closed(
2269 self.get_collaboratively_closed_contract(signed_contract_id, *own_payout, true)
2270 .await?,
2271 );
2272 (
2273 TxType::Revoked {
2274 update_idx: signed_channel.update_idx,
2275 own_adaptor_signature: *own_buffer_adaptor_signature,
2276 is_offer: false,
2277 revoked_tx_type: RevokedTxType::Buffer,
2278 },
2279 buffer_transaction.compute_txid(),
2280 Some(closed_contract),
2281 )
2282 }
2283 SignedChannelState::Settled {
2284 settle_tx,
2285 own_settle_adaptor_signature,
2286 ..
2287 } => (
2288 TxType::Revoked {
2289 update_idx: signed_channel.update_idx,
2290 own_adaptor_signature: *own_settle_adaptor_signature,
2291 is_offer: false,
2292 revoked_tx_type: RevokedTxType::Settle,
2293 },
2294 settle_tx.compute_txid(),
2295 None,
2296 ),
2297 s => {
2298 return Err(Error::InvalidState(format!(
2299 "Expected rollback state Established or Revoked but found {s:?}"
2300 )))
2301 }
2302 };
2303 let accepted_contract =
2304 get_contract_in_state!(self, &contract_id, Accepted, Some(*peer_id))?;
2305
2306 let (signed_contract, msg) = crate::channel_updater::verify_renew_confirm_and_finalize(
2307 &self.secp,
2308 &mut signed_channel,
2309 &accepted_contract,
2310 renew_confirm,
2311 PEER_TIMEOUT,
2312 &self.time,
2313 &self.wallet,
2314 &self.signer_provider,
2315 &self.chain_monitor,
2316 &self.store,
2317 )
2318 .await?;
2319
2320 self.chain_monitor.lock().await.add_tx(
2321 prev_tx_id,
2322 ChannelInfo {
2323 channel_id: signed_channel.channel_id,
2324 tx_type,
2325 },
2326 );
2327
2328 self.store
2330 .upsert_channel(
2331 Channel::Signed(signed_channel),
2332 Some(Contract::Confirmed(signed_contract)),
2333 )
2334 .await?;
2335
2336 self.store
2337 .persist_chain_monitor(&*self.chain_monitor.lock().await)
2338 .await?;
2339
2340 if let Some(closed_contract) = closed_contract {
2341 self.store.update_contract(&closed_contract).await?;
2342 }
2343
2344 Ok(msg)
2345 }
2346
2347 async fn on_renew_finalize(
2348 &self,
2349 renew_finalize: &RenewFinalize,
2350 peer_id: &PublicKey,
2351 ) -> Result<RenewRevoke, Error> {
2352 let mut signed_channel =
2353 get_channel_in_state!(self, &renew_finalize.channel_id, Signed, Some(*peer_id))?;
2354 let own_payout = get_signed_channel_state!(signed_channel, RenewConfirmed, own_payout)?;
2355
2356 let (tx_type, prev_tx_id, closed_contract) = match signed_channel
2357 .roll_back_state
2358 .as_ref()
2359 .expect("to have a rollback state")
2360 {
2361 SignedChannelState::Established {
2362 own_buffer_adaptor_signature,
2363 buffer_transaction,
2364 signed_contract_id,
2365 ..
2366 } => {
2367 let closed_contract = self
2368 .get_collaboratively_closed_contract(signed_contract_id, *own_payout, true)
2369 .await?;
2370 (
2371 TxType::Revoked {
2372 update_idx: signed_channel.update_idx,
2373 own_adaptor_signature: *own_buffer_adaptor_signature,
2374 is_offer: false,
2375 revoked_tx_type: RevokedTxType::Buffer,
2376 },
2377 buffer_transaction.compute_txid(),
2378 Some(Contract::Closed(closed_contract)),
2379 )
2380 }
2381 SignedChannelState::Settled {
2382 settle_tx,
2383 own_settle_adaptor_signature,
2384 ..
2385 } => (
2386 TxType::Revoked {
2387 update_idx: signed_channel.update_idx,
2388 own_adaptor_signature: *own_settle_adaptor_signature,
2389 is_offer: false,
2390 revoked_tx_type: RevokedTxType::Settle,
2391 },
2392 settle_tx.compute_txid(),
2393 None,
2394 ),
2395 s => {
2396 return Err(Error::InvalidState(format!(
2397 "Expected rollback state of Established or Settled but was {s:?}"
2398 )))
2399 }
2400 };
2401
2402 let msg = crate::channel_updater::renew_channel_on_finalize(
2403 &self.secp,
2404 &mut signed_channel,
2405 renew_finalize,
2406 &self.signer_provider,
2407 )?;
2408
2409 self.chain_monitor.lock().await.add_tx(
2410 prev_tx_id,
2411 ChannelInfo {
2412 channel_id: signed_channel.channel_id,
2413 tx_type,
2414 },
2415 );
2416
2417 let buffer_tx =
2418 get_signed_channel_state!(signed_channel, Established, ref buffer_transaction)?;
2419
2420 self.chain_monitor.lock().await.add_tx(
2421 buffer_tx.compute_txid(),
2422 ChannelInfo {
2423 channel_id: signed_channel.channel_id,
2424 tx_type: TxType::BufferTx,
2425 },
2426 );
2427
2428 self.store
2429 .upsert_channel(Channel::Signed(signed_channel), None)
2430 .await?;
2431 self.store
2432 .persist_chain_monitor(&*self.chain_monitor.lock().await)
2433 .await?;
2434
2435 if let Some(closed_contract) = closed_contract {
2436 self.store.update_contract(&closed_contract).await?;
2437 }
2438
2439 Ok(msg)
2440 }
2441
2442 async fn on_renew_revoke(
2443 &self,
2444 renew_revoke: &RenewRevoke,
2445 peer_id: &PublicKey,
2446 ) -> Result<(), Error> {
2447 let mut signed_channel =
2448 get_channel_in_state!(self, &renew_revoke.channel_id, Signed, Some(*peer_id))?;
2449
2450 crate::channel_updater::renew_channel_on_revoke(
2451 &self.secp,
2452 &mut signed_channel,
2453 renew_revoke,
2454 )?;
2455
2456 self.store
2457 .upsert_channel(Channel::Signed(signed_channel), None)
2458 .await?;
2459
2460 Ok(())
2461 }
2462
2463 async fn on_collaborative_close_offer(
2464 &self,
2465 close_offer: &CollaborativeCloseOffer,
2466 peer_id: &PublicKey,
2467 ) -> Result<(), Error> {
2468 let mut signed_channel =
2469 get_channel_in_state!(self, &close_offer.channel_id, Signed, Some(*peer_id))?;
2470
2471 crate::channel_updater::on_collaborative_close_offer(
2472 &mut signed_channel,
2473 close_offer,
2474 PEER_TIMEOUT,
2475 &self.time,
2476 )?;
2477
2478 self.store
2479 .upsert_channel(Channel::Signed(signed_channel), None)
2480 .await?;
2481
2482 Ok(())
2483 }
2484
2485 async fn on_reject(&self, reject: &Reject, counter_party: &PublicKey) -> Result<(), Error> {
2486 let channel = self.store.get_channel(&reject.channel_id).await?;
2487
2488 if let Some(channel) = channel {
2489 if channel.get_counter_party_id() != *counter_party {
2490 return Err(Error::InvalidParameters(format!(
2491 "Peer {:02x?} is not involved with {} {:02x?}.",
2492 counter_party,
2493 stringify!(Channel),
2494 channel.get_id()
2495 )));
2496 }
2497 match channel {
2498 Channel::Offered(offered_channel) => {
2499 let offered_contract = get_contract_in_state!(
2500 self,
2501 &offered_channel.offered_contract_id,
2502 Offered,
2503 None as Option<PublicKey>
2504 )?;
2505 let utxos = offered_contract
2506 .funding_inputs
2507 .iter()
2508 .map(|funding_input| {
2509 let txid = Transaction::consensus_decode(
2510 &mut funding_input.prev_tx.as_slice(),
2511 )
2512 .expect("Transaction Decode Error")
2513 .compute_txid();
2514 let vout = funding_input.prev_tx_vout;
2515 OutPoint { txid, vout }
2516 })
2517 .collect::<Vec<_>>();
2518
2519 self.wallet.unreserve_utxos(&utxos)?;
2520
2521 self.store
2523 .upsert_channel(
2524 Channel::Cancelled(offered_channel),
2525 Some(Contract::Rejected(offered_contract)),
2526 )
2527 .await?;
2528 }
2529 Channel::Signed(mut signed_channel) => {
2530 let contract = match signed_channel.state {
2531 SignedChannelState::RenewOffered {
2532 offered_contract_id,
2533 ..
2534 } => {
2535 let offered_contract = get_contract_in_state!(
2536 self,
2537 &offered_contract_id,
2538 Offered,
2539 None::<PublicKey>
2540 )?;
2541 Some(Contract::Rejected(offered_contract))
2542 }
2543 _ => None,
2544 };
2545
2546 crate::channel_updater::on_reject(&mut signed_channel)?;
2547
2548 self.store
2549 .upsert_channel(Channel::Signed(signed_channel), contract)
2550 .await?;
2551 }
2552 channel => {
2553 return Err(Error::InvalidState(format!(
2554 "Not in a state adequate to receive a reject message. {channel:?}"
2555 )))
2556 }
2557 }
2558 } else {
2559 tracing::warn!(
2560 "Couldn't find rejected dlc channel with id: {}",
2561 reject.channel_id.to_lower_hex_string()
2562 );
2563 }
2564
2565 Ok(())
2566 }
2567
2568 async fn channel_checks(&self) -> Result<(), Error> {
2569 let established_closing_channels = self
2570 .store
2571 .get_signed_channels(Some(SignedChannelStateType::Closing))
2572 .await?;
2573
2574 for channel in established_closing_channels {
2575 if let Err(e) = self.try_finalize_closing_established_channel(channel).await {
2576 tracing::error!("Error trying to close established channel: {}", e);
2577 }
2578 }
2579
2580 if let Err(e) = self.check_for_timed_out_channels().await {
2581 tracing::error!("Error checking timed out channels {}", e);
2582 }
2583 self.check_for_watched_tx().await
2584 }
2585
2586 async fn check_for_timed_out_channels(&self) -> Result<(), Error> {
2587 check_for_timed_out_channels!(self, RenewOffered);
2588 check_for_timed_out_channels!(self, RenewAccepted);
2589 check_for_timed_out_channels!(self, RenewConfirmed);
2590 check_for_timed_out_channels!(self, SettledOffered);
2591 check_for_timed_out_channels!(self, SettledAccepted);
2592 check_for_timed_out_channels!(self, SettledConfirmed);
2593
2594 Ok(())
2595 }
2596
2597 pub(crate) async fn process_watched_txs(
2598 &self,
2599 watched_txs: Vec<(Transaction, ChannelInfo)>,
2600 ) -> Result<(), Error> {
2601 for (tx, channel_info) in watched_txs {
2602 let mut signed_channel = match get_channel_in_state!(
2603 self,
2604 &channel_info.channel_id,
2605 Signed,
2606 None as Option<PublicKey>
2607 ) {
2608 Ok(c) => c,
2609 Err(e) => {
2610 tracing::error!(
2611 "Could not retrieve channel {:?}: {}",
2612 channel_info.channel_id,
2613 e
2614 );
2615 continue;
2616 }
2617 };
2618
2619 let persist = match channel_info.tx_type {
2620 TxType::BufferTx => {
2621 let contract_id = signed_channel
2628 .get_contract_id()
2629 .expect("to have a contract id");
2630 let mut state = SignedChannelState::Closing {
2631 buffer_transaction: tx.clone(),
2632 is_initiator: false,
2633 contract_id,
2634 keys_id: signed_channel.keys_id().ok_or_else(|| {
2635 Error::InvalidState("Expected to have keys_id.".to_string())
2636 })?,
2637 };
2638 std::mem::swap(&mut signed_channel.state, &mut state);
2639
2640 signed_channel.roll_back_state = Some(state);
2641
2642 self.store
2643 .upsert_channel(Channel::Signed(signed_channel), None)
2644 .await?;
2645
2646 false
2647 }
2648 TxType::Revoked {
2649 update_idx,
2650 own_adaptor_signature,
2651 is_offer,
2652 revoked_tx_type,
2653 } => {
2654 let secret = signed_channel
2655 .counter_party_commitment_secrets
2656 .get_secret(update_idx)
2657 .expect("to be able to retrieve the per update secret");
2658 let counter_per_update_secret = SecretKey::from_slice(&secret)
2659 .expect("to be able to parse the counter per update secret.");
2660
2661 let per_update_seed_pk = signed_channel.own_per_update_seed;
2662
2663 let per_update_seed_sk = self
2664 .signer_provider
2665 .get_secret_key_for_pubkey(&per_update_seed_pk)?;
2666
2667 let per_update_secret = SecretKey::from_slice(&build_commitment_secret(
2668 per_update_seed_sk.as_ref(),
2669 update_idx,
2670 ))
2671 .expect("a valid secret key.");
2672
2673 let per_update_point =
2674 PublicKey::from_secret_key(&self.secp, &per_update_secret);
2675
2676 let own_revocation_params = signed_channel.own_points.get_revokable_params(
2677 &self.secp,
2678 &signed_channel.counter_points.revocation_basepoint,
2679 &per_update_point,
2680 );
2681
2682 let counter_per_update_point =
2683 PublicKey::from_secret_key(&self.secp, &counter_per_update_secret);
2684
2685 let base_own_sk = self
2686 .signer_provider
2687 .get_secret_key_for_pubkey(&signed_channel.own_points.own_basepoint)?;
2688
2689 let own_sk = derive_private_key(&self.secp, &per_update_point, &base_own_sk);
2690
2691 let counter_revocation_params =
2692 signed_channel.counter_points.get_revokable_params(
2693 &self.secp,
2694 &signed_channel.own_points.revocation_basepoint,
2695 &counter_per_update_point,
2696 );
2697
2698 let witness = if signed_channel.own_params.fund_pubkey
2699 < signed_channel.counter_params.fund_pubkey
2700 {
2701 tx.input[0].witness.to_vec().remove(1)
2702 } else {
2703 tx.input[0].witness.to_vec().remove(2)
2704 };
2705
2706 let sig_data = witness
2707 .iter()
2708 .take(witness.len() - 1)
2709 .cloned()
2710 .collect::<Vec<_>>();
2711 let own_sig = Signature::from_der(&sig_data)?;
2712
2713 let counter_sk = own_adaptor_signature.recover(
2714 &self.secp,
2715 &own_sig,
2716 &counter_revocation_params.publish_pk.inner,
2717 )?;
2718
2719 let own_revocation_base_secret =
2720 &self.signer_provider.get_secret_key_for_pubkey(
2721 &signed_channel.own_points.revocation_basepoint,
2722 )?;
2723
2724 let counter_revocation_sk = derive_private_revocation_key(
2725 &self.secp,
2726 &counter_per_update_secret,
2727 own_revocation_base_secret,
2728 );
2729
2730 let (offer_params, accept_params) = if is_offer {
2731 (&own_revocation_params, &counter_revocation_params)
2732 } else {
2733 (&counter_revocation_params, &own_revocation_params)
2734 };
2735
2736 let fee_rate_per_vb: u64 = (self.fee_estimator.get_est_sat_per_1000_weight(
2737 lightning::chain::chaininterface::ConfirmationTarget::UrgentOnChainSweep,
2738 ) / 250)
2739 .into();
2740
2741 let signed_tx = match revoked_tx_type {
2742 RevokedTxType::Buffer => {
2743 ddk_dlc::channel::create_and_sign_punish_buffer_transaction(
2744 &self.secp,
2745 offer_params,
2746 accept_params,
2747 &own_sk,
2748 &counter_sk,
2749 &counter_revocation_sk,
2750 &tx,
2751 &self.wallet.get_new_address().await?,
2752 0,
2753 fee_rate_per_vb,
2754 )?
2755 }
2756 RevokedTxType::Settle => {
2757 ddk_dlc::channel::create_and_sign_punish_settle_transaction(
2758 &self.secp,
2759 offer_params,
2760 accept_params,
2761 &own_sk,
2762 &counter_sk,
2763 &counter_revocation_sk,
2764 &tx,
2765 &self.wallet.get_new_address().await?,
2766 CET_NSEQUENCE,
2767 0,
2768 fee_rate_per_vb,
2769 is_offer,
2770 )?
2771 }
2772 };
2773
2774 self.blockchain.send_transaction(&signed_tx).await?;
2775
2776 let closed_channel = Channel::ClosedPunished(ClosedPunishedChannel {
2777 counter_party: signed_channel.counter_party,
2778 temporary_channel_id: signed_channel.temporary_channel_id,
2779 channel_id: signed_channel.channel_id,
2780 punish_txid: signed_tx.compute_txid(),
2781 });
2782
2783 self.chain_monitor
2786 .lock()
2787 .await
2788 .cleanup_channel(signed_channel.channel_id);
2789 self.store.upsert_channel(closed_channel, None).await?;
2790 true
2791 }
2792 TxType::CollaborativeClose => {
2793 if let Some(SignedChannelState::Established {
2794 signed_contract_id, ..
2795 }) = signed_channel.roll_back_state
2796 {
2797 let counter_payout = get_signed_channel_state!(
2798 signed_channel,
2799 CollaborativeCloseOffered,
2800 counter_payout
2801 )?;
2802 let closed_contract = self
2803 .get_collaboratively_closed_contract(
2804 &signed_contract_id,
2805 *counter_payout,
2806 false,
2807 )
2808 .await?;
2809 self.store
2810 .update_contract(&Contract::Closed(closed_contract))
2811 .await?;
2812 }
2813 let closed_channel = Channel::CollaborativelyClosed(ClosedChannel {
2814 counter_party: signed_channel.counter_party,
2815 temporary_channel_id: signed_channel.temporary_channel_id,
2816 channel_id: signed_channel.channel_id,
2817 });
2818 self.chain_monitor
2819 .lock()
2820 .await
2821 .cleanup_channel(signed_channel.channel_id);
2822 self.store.upsert_channel(closed_channel, None).await?;
2823 true
2824 }
2825 TxType::SettleTx => {
2826 let closed_channel = Channel::CounterClosed(ClosedChannel {
2827 counter_party: signed_channel.counter_party,
2828 temporary_channel_id: signed_channel.temporary_channel_id,
2829 channel_id: signed_channel.channel_id,
2830 });
2831 self.chain_monitor
2832 .lock()
2833 .await
2834 .cleanup_channel(signed_channel.channel_id);
2835 self.store.upsert_channel(closed_channel, None).await?;
2836 true
2837 }
2838 TxType::Cet => {
2839 let contract_id = signed_channel.get_contract_id();
2840 let closed_channel = {
2841 match &signed_channel.state {
2842 SignedChannelState::Closing { is_initiator, .. } => {
2843 if *is_initiator {
2844 Channel::Closed(ClosedChannel {
2845 counter_party: signed_channel.counter_party,
2846 temporary_channel_id: signed_channel.temporary_channel_id,
2847 channel_id: signed_channel.channel_id,
2848 })
2849 } else {
2850 Channel::CounterClosed(ClosedChannel {
2851 counter_party: signed_channel.counter_party,
2852 temporary_channel_id: signed_channel.temporary_channel_id,
2853 channel_id: signed_channel.channel_id,
2854 })
2855 }
2856 }
2857 _ => {
2858 tracing::error!("Saw spending of buffer transaction without being in closing state");
2859 Channel::Closed(ClosedChannel {
2860 counter_party: signed_channel.counter_party,
2861 temporary_channel_id: signed_channel.temporary_channel_id,
2862 channel_id: signed_channel.channel_id,
2863 })
2864 }
2865 }
2866 };
2867
2868 self.chain_monitor
2869 .lock()
2870 .await
2871 .cleanup_channel(signed_channel.channel_id);
2872 let pre_closed_contract = futures::stream::iter(contract_id)
2873 .then(|contract_id| {
2874 let txn = tx.clone();
2875 async move {
2876 self.store.get_contract(&contract_id).await.map(|contract| {
2877 contract.and_then(|contract| match contract {
2878 Contract::Confirmed(signed_contract) => {
2879 Some(Contract::PreClosed(PreClosedContract {
2880 signed_contract,
2881 attestations: None,
2882 signed_cet: txn,
2883 }))
2884 }
2885 _ => None,
2886 })
2887 })
2888 }
2889 })
2890 .try_collect::<Vec<_>>()
2891 .await?
2892 .into_iter()
2893 .flatten()
2894 .next();
2895
2896 self.store
2897 .upsert_channel(closed_channel, pre_closed_contract)
2898 .await?;
2899
2900 true
2901 }
2902 };
2903
2904 if persist {
2905 self.store
2906 .persist_chain_monitor(&*self.chain_monitor.lock().await)
2907 .await?;
2908 }
2909 }
2910 Ok(())
2911 }
2912
2913 async fn check_for_watched_tx(&self) -> Result<(), Error> {
2914 let confirmed_txs = self.chain_monitor.lock().await.confirmed_txs();
2915
2916 self.process_watched_txs(confirmed_txs).await?;
2917
2918 self.store
2919 .persist_chain_monitor(&*self.chain_monitor.lock().await)
2920 .await?;
2921
2922 Ok(())
2923 }
2924
2925 async fn force_close_channel_internal(
2926 &self,
2927 mut channel: SignedChannel,
2928 is_initiator: bool,
2929 ) -> Result<(), Error> {
2930 match &channel.state {
2931 SignedChannelState::Established {
2932 counter_buffer_adaptor_signature,
2933 buffer_transaction,
2934 ..
2935 } => {
2936 let counter_buffer_adaptor_signature = *counter_buffer_adaptor_signature;
2937 let buffer_transaction = buffer_transaction.clone();
2938 self.initiate_unilateral_close_established_channel(
2939 channel,
2940 is_initiator,
2941 counter_buffer_adaptor_signature,
2942 buffer_transaction,
2943 )
2944 .await
2945 }
2946 SignedChannelState::RenewFinalized {
2947 buffer_transaction,
2948 offer_buffer_adaptor_signature,
2949 ..
2950 } => {
2951 let offer_buffer_adaptor_signature = *offer_buffer_adaptor_signature;
2952 let buffer_transaction = buffer_transaction.clone();
2953 self.initiate_unilateral_close_established_channel(
2954 channel,
2955 is_initiator,
2956 offer_buffer_adaptor_signature,
2957 buffer_transaction,
2958 )
2959 .await
2960 }
2961 SignedChannelState::Settled { .. } => {
2962 self.close_settled_channel(channel, is_initiator).await
2963 }
2964 SignedChannelState::SettledOffered { .. }
2965 | SignedChannelState::SettledReceived { .. }
2966 | SignedChannelState::SettledAccepted { .. }
2967 | SignedChannelState::SettledConfirmed { .. }
2968 | SignedChannelState::RenewOffered { .. }
2969 | SignedChannelState::RenewAccepted { .. }
2970 | SignedChannelState::RenewConfirmed { .. }
2971 | SignedChannelState::CollaborativeCloseOffered { .. } => {
2972 channel.state = channel
2973 .roll_back_state
2974 .take()
2975 .expect("to have a rollback state");
2976 let channel_clone = channel.clone(); Box::pin(self.force_close_channel_internal(channel_clone, is_initiator)).await
2978 }
2979 SignedChannelState::Closing { .. } => Err(Error::InvalidState(
2980 "Channel is already closing.".to_string(),
2981 )),
2982 }
2983 }
2984
2985 async fn initiate_unilateral_close_established_channel(
2987 &self,
2988 mut signed_channel: SignedChannel,
2989 is_initiator: bool,
2990 buffer_adaptor_signature: EcdsaAdaptorSignature,
2991 buffer_transaction: Transaction,
2992 ) -> Result<(), Error> {
2993 let keys_id = signed_channel.keys_id().ok_or_else(|| {
2994 Error::InvalidState("Expected to be in a state with an associated keys id.".to_string())
2995 })?;
2996
2997 crate::channel_updater::initiate_unilateral_close_established_channel(
2998 &self.secp,
2999 &mut signed_channel,
3000 buffer_adaptor_signature,
3001 keys_id,
3002 buffer_transaction,
3003 &self.signer_provider,
3004 is_initiator,
3005 )?;
3006
3007 let buffer_transaction =
3008 get_signed_channel_state!(signed_channel, Closing, ref buffer_transaction)?;
3009
3010 self.blockchain.send_transaction(buffer_transaction).await?;
3011
3012 self.chain_monitor
3013 .lock()
3014 .await
3015 .remove_tx(&buffer_transaction.compute_txid());
3016
3017 self.store
3018 .upsert_channel(Channel::Signed(signed_channel), None)
3019 .await?;
3020
3021 self.store
3022 .persist_chain_monitor(&*self.chain_monitor.lock().await)
3023 .await?;
3024
3025 Ok(())
3026 }
3027
3028 async fn close_settled_channel(
3030 &self,
3031 signed_channel: SignedChannel,
3032 is_initiator: bool,
3033 ) -> Result<(), Error> {
3034 let (settle_tx, closed_channel) = crate::channel_updater::close_settled_channel(
3035 &self.secp,
3036 &signed_channel,
3037 &self.signer_provider,
3038 is_initiator,
3039 )?;
3040
3041 if self
3042 .blockchain
3043 .get_transaction_confirmations(&settle_tx.compute_txid())
3044 .await
3045 .unwrap_or(0)
3046 == 0
3047 {
3048 self.blockchain.send_transaction(&settle_tx).await?;
3049 }
3050
3051 self.chain_monitor
3052 .lock()
3053 .await
3054 .cleanup_channel(signed_channel.channel_id);
3055
3056 self.store.upsert_channel(closed_channel, None).await?;
3057
3058 Ok(())
3059 }
3060
3061 async fn get_collaboratively_closed_contract(
3062 &self,
3063 contract_id: &ContractId,
3064 payout: Amount,
3065 is_own_payout: bool,
3066 ) -> Result<ClosedContract, Error> {
3067 let contract = get_contract_in_state!(self, contract_id, Confirmed, None::<PublicKey>)?;
3068 let own_collateral = if contract.accepted_contract.offered_contract.is_offer_party {
3069 contract
3070 .accepted_contract
3071 .offered_contract
3072 .offer_params
3073 .collateral
3074 } else {
3075 contract.accepted_contract.accept_params.collateral
3076 };
3077 let own_payout = if is_own_payout {
3078 payout
3079 } else {
3080 contract.accepted_contract.offered_contract.total_collateral - payout
3081 };
3082 let pnl =
3083 SignedAmount::from_sat(own_payout.to_sat() as i64 - own_collateral.to_sat() as i64);
3084 Ok(ClosedContract {
3085 attestations: None,
3086 signed_cet: None,
3087 contract_id: *contract_id,
3088 temporary_contract_id: contract.accepted_contract.offered_contract.id,
3089 counter_party_id: contract.accepted_contract.offered_contract.counter_party,
3090 funding_txid: contract
3091 .accepted_contract
3092 .dlc_transactions
3093 .fund
3094 .compute_txid(),
3095 pnl,
3096 })
3097 }
3098
3099 async fn oracle_announcements(
3100 &self,
3101 contract_input: &ContractInput,
3102 ) -> Result<Vec<Vec<OracleAnnouncement>>, Error> {
3103 let announcements = stream::iter(contract_input.contract_infos.iter())
3104 .map(|x| {
3105 let future = self.get_oracle_announcements(&x.oracles);
3106 async move {
3107 match future.await {
3108 Ok(result) => Ok(result),
3109 Err(e) => {
3110 tracing::error!("Failed to get oracle announcements: {}", e);
3111 Err(e)
3112 }
3113 }
3114 }
3115 })
3116 .collect::<FuturesUnordered<_>>()
3117 .await
3118 .try_collect::<Vec<_>>()
3119 .await?;
3120 Ok(announcements)
3121 }
3122}