1use super::{
4 Blockchain, CachedContractSignerProvider, ContractSigner, Oracle, Storage, Time, Wallet,
5};
6use crate::chain_monitor::{ChainMonitor, ChannelInfo, RevokedTxType, TxType};
7use crate::channel::offered_channel::OfferedChannel;
8use crate::channel::signed_channel::{SignedChannel, SignedChannelState, SignedChannelStateType};
9use crate::channel::{Channel, ClosedChannel, ClosedPunishedChannel};
10use crate::channel_updater::get_signed_channel_state;
11use crate::channel_updater::verify_signed_channel;
12use crate::contract::{
13 accepted_contract::AcceptedContract, contract_info::ContractInfo,
14 contract_input::ContractInput, contract_input::OracleInput, offered_contract::OfferedContract,
15 signed_contract::SignedContract, AdaptorInfo, ClosedContract, Contract, FailedAcceptContract,
16 FailedSignContract, PreClosedContract,
17};
18use crate::contract_updater::{accept_contract, verify_accepted_and_sign_contract};
19use crate::error::Error;
20use crate::utils::get_object_in_state;
21use crate::{ChannelId, ContractId, ContractSignerProvider};
22use bitcoin::absolute::Height;
23use bitcoin::consensus::encode::serialize_hex;
24use bitcoin::consensus::Decodable;
25use bitcoin::hex::DisplayHex;
26use bitcoin::{Address, Amount, SignedAmount};
27use bitcoin::{OutPoint, Transaction};
28use ddk_messages::channel::{
29 AcceptChannel, CollaborativeCloseOffer, OfferChannel, Reject, RenewAccept, RenewConfirm,
30 RenewFinalize, RenewOffer, RenewRevoke, SettleAccept, SettleConfirm, SettleFinalize,
31 SettleOffer, SignChannel,
32};
33use ddk_messages::oracle_msgs::{OracleAnnouncement, OracleAttestation};
34use ddk_messages::{AcceptDlc, CloseDlc, Message as DlcMessage, OfferDlc, SignDlc};
35use futures::stream;
36use futures::stream::FuturesUnordered;
37use futures::{StreamExt, TryStreamExt};
38use lightning::chain::chaininterface::FeeEstimator;
39use lightning::ln::chan_utils::{
40 build_commitment_secret, derive_private_key, derive_private_revocation_key,
41};
42use lightning::util::logger::Logger;
43use lightning::{log_debug, log_error, log_info, log_trace, log_warn};
44use secp256k1_zkp::XOnlyPublicKey;
45use secp256k1_zkp::{
46 ecdsa::Signature, All, EcdsaAdaptorSignature, PublicKey, Secp256k1, SecretKey,
47};
48use std::collections::HashMap;
49use std::ops::Deref;
50use std::string::ToString;
51use std::sync::Arc;
52use tokio::sync::Mutex;
53
54pub const NB_CONFIRMATIONS: u32 = 6;
56pub const REFUND_DELAY: u32 = 86400 * 7;
58pub const CET_NSEQUENCE: u32 = 288;
60pub const PEER_TIMEOUT: u64 = 3600;
63
64type ClosableContractInfo<'a> = Option<(
65 &'a ContractInfo,
66 &'a AdaptorInfo,
67 Vec<(usize, OracleAttestation)>,
68)>;
69
70#[derive(Debug)]
72pub struct Manager<
73 W: Deref,
74 SP: Deref,
75 B: Deref,
76 S: Deref,
77 O: Deref,
78 T: Deref,
79 F: Deref,
80 X: ContractSigner,
81 L: Deref,
82> where
83 W::Target: Wallet,
84 SP::Target: ContractSignerProvider<Signer = X>,
85 B::Target: Blockchain,
86 S::Target: Storage,
87 O::Target: Oracle,
88 T::Target: Time,
89 F::Target: FeeEstimator,
90 L::Target: Logger,
91{
92 oracles: HashMap<XOnlyPublicKey, O>,
93 wallet: W,
94 signer_provider: SP,
95 blockchain: B,
96 store: S,
97 secp: Secp256k1<All>,
98 chain_monitor: Mutex<ChainMonitor>,
99 time: T,
100 fee_estimator: F,
101 logger: L,
102}
103
104macro_rules! get_contract_in_state {
105 ($manager: ident, $contract_id: expr, $state: ident, $peer_id: expr) => {{
106 get_object_in_state!(
107 $manager,
108 $contract_id,
109 $state,
110 $peer_id,
111 Contract,
112 get_contract
113 )
114 }};
115}
116
117macro_rules! get_channel_in_state {
118 ($manager: ident, $channel_id: expr, $state: ident, $peer_id: expr) => {{
119 get_object_in_state!(
120 $manager,
121 $channel_id,
122 $state,
123 $peer_id,
124 Channel,
125 get_channel
126 )
127 }};
128}
129
130macro_rules! get_signed_channel_rollback_state {
131 ($signed_channel: ident, $state: ident, $($field: ident),*) => {{
132 match $signed_channel.roll_back_state.as_ref() {
133 Some(SignedChannelState::$state{$($field,)* ..}) => Ok(($($field,)*)),
134 _ => Err(Error::InvalidState(format!("Expected rollback state {} got {:?}", stringify!($state), $signed_channel.state))),
135 }
136 }};
137}
138
139macro_rules! check_for_timed_out_channels {
140 ($manager: ident, $state: ident) => {
141 let channels = $manager
142 .store
143 .get_signed_channels(Some(SignedChannelStateType::$state))
144 .await?;
145
146 for channel in channels {
147 if let SignedChannelState::$state { timeout, .. } = channel.state {
148 let is_timed_out = timeout < $manager.time.unix_time_now();
149 if is_timed_out {
150 match $manager.force_close_channel_internal(channel, true).await {
151 Err(e) => {
152 log_error!($manager.logger, "Error force closing channel. error={}", e)
153 }
154 _ => {}
155 }
156 }
157 }
158 }
159 };
160}
161
162impl<
163 W: Deref,
164 SP: Deref,
165 B: Deref,
166 S: Deref,
167 O: Deref,
168 T: Deref,
169 F: Deref,
170 X: ContractSigner,
171 L: Deref,
172 > Manager<W, Arc<CachedContractSignerProvider<SP, X>>, B, S, O, T, F, X, L>
173where
174 W::Target: Wallet,
175 SP::Target: ContractSignerProvider<Signer = X>,
176 B::Target: Blockchain,
177 S::Target: Storage,
178 O::Target: Oracle,
179 T::Target: Time,
180 F::Target: FeeEstimator,
181 L::Target: Logger,
182{
183 #[allow(clippy::too_many_arguments)]
185 #[tracing::instrument(skip_all)]
186 pub async fn new(
187 wallet: W,
188 signer_provider: SP,
189 blockchain: B,
190 store: S,
191 oracles: HashMap<XOnlyPublicKey, O>,
192 time: T,
193 fee_estimator: F,
194 logger: L,
195 ) -> Result<Self, Error> {
196 let init_height = blockchain.get_blockchain_height().await?;
197 let chain_monitor = Mutex::new(
198 store
199 .get_chain_monitor()
200 .await?
201 .unwrap_or(ChainMonitor::new(init_height)),
202 );
203
204 let signer_provider = Arc::new(CachedContractSignerProvider::new(signer_provider));
205
206 log_info!(logger, "Manager initialized");
207
208 Ok(Manager {
209 secp: secp256k1_zkp::Secp256k1::new(),
210 wallet,
211 signer_provider,
212 blockchain,
213 store,
214 oracles,
215 time,
216 fee_estimator,
217 chain_monitor,
218 logger,
219 })
220 }
221
222 pub fn get_store(&self) -> &S {
224 &self.store
225 }
226
227 #[tracing::instrument(skip_all)]
229 pub async fn on_dlc_message(
230 &self,
231 msg: &DlcMessage,
232 counter_party: PublicKey,
233 ) -> Result<Option<DlcMessage>, Error> {
234 match msg {
235 DlcMessage::Offer(o) => {
236 log_debug!(self.logger, "Received offer message");
237 self.on_offer_message(o, counter_party).await?;
238 Ok(None)
239 }
240 DlcMessage::Accept(a) => {
241 log_debug!(self.logger, "Received accept message");
242 Ok(Some(self.on_accept_message(a, &counter_party).await?))
243 }
244 DlcMessage::Sign(s) => {
245 log_debug!(self.logger, "Received sign message");
246 self.on_sign_message(s, &counter_party).await?;
247 Ok(None)
248 }
249 DlcMessage::Close(c) => {
250 log_debug!(self.logger, "Received close message");
251 self.on_close_message(c, &counter_party).await?;
252 Ok(None)
253 }
254 DlcMessage::OfferChannel(o) => {
255 log_debug!(self.logger, "Received offer channel message");
256 self.on_offer_channel(o, counter_party).await?;
257 Ok(None)
258 }
259 DlcMessage::AcceptChannel(a) => {
260 log_debug!(self.logger, "Received accept channel message");
261 Ok(Some(DlcMessage::SignChannel(
262 self.on_accept_channel(a, &counter_party).await?,
263 )))
264 }
265 DlcMessage::SignChannel(s) => {
266 log_debug!(self.logger, "Received sign channel message");
267 self.on_sign_channel(s, &counter_party).await?;
268 Ok(None)
269 }
270 DlcMessage::SettleOffer(s) => {
271 log_debug!(self.logger, "Received settle offer message");
272 match self.on_settle_offer(s, &counter_party).await? {
273 Some(msg) => Ok(Some(DlcMessage::Reject(msg))),
274 None => Ok(None),
275 }
276 }
277 DlcMessage::SettleAccept(s) => {
278 log_debug!(self.logger, "Received settle accept message");
279 Ok(Some(DlcMessage::SettleConfirm(
280 self.on_settle_accept(s, &counter_party).await?,
281 )))
282 }
283 DlcMessage::SettleConfirm(s) => {
284 log_debug!(self.logger, "Received settle confirm message");
285 Ok(Some(DlcMessage::SettleFinalize(
286 self.on_settle_confirm(s, &counter_party).await?,
287 )))
288 }
289 DlcMessage::SettleFinalize(s) => {
290 log_debug!(self.logger, "Received settle finalize message");
291 self.on_settle_finalize(s, &counter_party).await?;
292 Ok(None)
293 }
294 DlcMessage::RenewOffer(r) => {
295 log_debug!(self.logger, "Received renew offer message");
296 match self.on_renew_offer(r, &counter_party).await? {
297 Some(msg) => Ok(Some(DlcMessage::Reject(msg))),
298 None => Ok(None),
299 }
300 }
301 DlcMessage::RenewAccept(r) => {
302 log_debug!(self.logger, "Received renew accept message");
303 Ok(Some(DlcMessage::RenewConfirm(
304 self.on_renew_accept(r, &counter_party).await?,
305 )))
306 }
307 DlcMessage::RenewConfirm(r) => {
308 log_debug!(self.logger, "Received renew confirm message");
309 Ok(Some(DlcMessage::RenewFinalize(
310 self.on_renew_confirm(r, &counter_party).await?,
311 )))
312 }
313 DlcMessage::RenewFinalize(r) => {
314 log_debug!(self.logger, "Received renew finalize message");
315 let revoke = self.on_renew_finalize(r, &counter_party).await?;
316 Ok(Some(DlcMessage::RenewRevoke(revoke)))
317 }
318 DlcMessage::RenewRevoke(r) => {
319 log_debug!(self.logger, "Received renew revoke message");
320 self.on_renew_revoke(r, &counter_party).await?;
321 Ok(None)
322 }
323 DlcMessage::CollaborativeCloseOffer(c) => {
324 log_debug!(self.logger, "Received collaborative close offer message");
325 self.on_collaborative_close_offer(c, &counter_party).await?;
326 Ok(None)
327 }
328 DlcMessage::Reject(r) => {
329 log_debug!(self.logger, "Received reject message");
330 self.on_reject(r, &counter_party).await?;
331 Ok(None)
332 }
333 }
334 }
335
336 #[tracing::instrument(skip_all)]
338 pub async fn send_splice_offer(
339 &self,
340 contract_input: &ContractInput,
341 counter_party: PublicKey,
342 contract_id: &ContractId,
343 ) -> Result<OfferDlc, Error> {
344 let oracle_announcements = self.oracle_announcements(contract_input).await?;
345
346 self.send_splice_offer_with_announcements(
347 contract_input,
348 counter_party,
349 contract_id,
350 oracle_announcements,
351 )
352 .await
353 }
354
355 #[tracing::instrument(skip_all)]
358 pub async fn send_splice_offer_with_announcements(
359 &self,
360 contract_input: &ContractInput,
361 counter_party: PublicKey,
362 contract_id: &ContractId,
363 oracle_announcements: Vec<Vec<OracleAnnouncement>>,
364 ) -> Result<OfferDlc, Error> {
365 let confirmed_contract =
366 get_contract_in_state!(self, contract_id, Confirmed, Some(counter_party))?;
367
368 let dlc_input = confirmed_contract.get_dlc_input();
369
370 let (offered_contract, offer_msg) = crate::contract_updater::offer_contract(
371 &self.secp,
372 contract_input,
373 oracle_announcements,
374 vec![dlc_input],
375 REFUND_DELAY,
376 &counter_party,
377 &self.wallet,
378 &self.blockchain,
379 &self.time,
380 &self.signer_provider,
381 &self.logger,
382 )
383 .await?;
384
385 offered_contract.validate()?;
386
387 self.store.create_contract(&offered_contract).await?;
388
389 Ok(offer_msg)
390 }
391
392 #[tracing::instrument(skip_all)]
397 pub async fn send_offer(
398 &self,
399 contract_input: &ContractInput,
400 counter_party: PublicKey,
401 ) -> Result<OfferDlc, Error> {
402 let oracle_announcements = self.oracle_announcements(contract_input).await?;
404
405 self.send_offer_with_announcements(contract_input, counter_party, oracle_announcements)
406 .await
407 }
408
409 #[tracing::instrument(skip_all)]
415 pub async fn send_offer_with_announcements(
416 &self,
417 contract_input: &ContractInput,
418 counter_party: PublicKey,
419 oracle_announcements: Vec<Vec<OracleAnnouncement>>,
420 ) -> Result<OfferDlc, Error> {
421 let (offered_contract, offer_msg) = crate::contract_updater::offer_contract(
422 &self.secp,
423 contract_input,
424 oracle_announcements,
425 vec![],
426 REFUND_DELAY,
427 &counter_party,
428 &self.wallet,
429 &self.blockchain,
430 &self.time,
431 &self.signer_provider,
432 &self.logger,
433 )
434 .await?;
435
436 offered_contract.validate()?;
437
438 self.store.create_contract(&offered_contract).await?;
439
440 Ok(offer_msg)
441 }
442
443 #[tracing::instrument(skip_all)]
445 pub async fn accept_contract_offer(
446 &self,
447 contract_id: &ContractId,
448 ) -> Result<(ContractId, PublicKey, AcceptDlc), Error> {
449 let offered_contract =
450 get_contract_in_state!(self, contract_id, Offered, None as Option<PublicKey>)?;
451
452 let counter_party = offered_contract.counter_party;
453
454 let (accepted_contract, accept_msg) = accept_contract(
455 &self.secp,
456 &offered_contract,
457 &self.wallet,
458 &self.signer_provider,
459 &self.blockchain,
460 &self.logger,
461 )
462 .await?;
463
464 self.wallet.import_address(&Address::p2wsh(
465 &accepted_contract.dlc_transactions.funding_script_pubkey,
466 self.blockchain.get_network()?,
467 ))?;
468
469 let contract_id = accepted_contract.get_contract_id();
470
471 self.store
472 .update_contract(&Contract::Accepted(accepted_contract))
473 .await?;
474
475 log_info!(
476 self.logger,
477 "Accepted and stored the contract. temp_id={} contract_id={}",
478 offered_contract.id.to_lower_hex_string(),
479 contract_id.to_lower_hex_string(),
480 );
481
482 Ok((contract_id, counter_party, accept_msg))
483 }
484
485 #[tracing::instrument(skip_all)]
491 pub async fn periodic_chain_monitor(&self) -> Result<(), Error> {
492 let cur_height = self.blockchain.get_blockchain_height().await?;
493 let last_height = self.chain_monitor.lock().await.last_height;
494
495 if cur_height < last_height {
498 return Err(Error::InvalidState(
499 "Current height is lower than last height.".to_string(),
500 ));
501 }
502
503 for height in last_height + 1..=cur_height {
504 let block = self.blockchain.get_block_at_height(height).await?;
505
506 self.chain_monitor
507 .lock()
508 .await
509 .process_block(&block, height);
510 }
511
512 Ok(())
513 }
514
515 #[tracing::instrument(skip_all, level = "debug")]
518 pub async fn periodic_check(&self, check_channels: bool) -> Result<(), Error> {
519 let signed_contracts = self.check_for_spliced_contract().await?;
520 self.check_signed_contracts(&signed_contracts).await?;
521 self.check_confirmed_contracts().await?;
522 self.check_preclosed_contracts().await?;
523
524 if check_channels {
525 self.channel_checks().await?;
526 }
527
528 Ok(())
529 }
530
531 #[tracing::instrument(skip_all)]
533 pub async fn on_offer_message(
534 &self,
535 offered_message: &OfferDlc,
536 counter_party: PublicKey,
537 ) -> Result<(), Error> {
538 offered_message.validate(&self.secp, REFUND_DELAY, REFUND_DELAY * 2)?;
539 let keys_id = self
540 .signer_provider
541 .derive_signer_key_id(false, offered_message.temporary_contract_id);
542 let contract: OfferedContract =
543 OfferedContract::try_from_offer_dlc(offered_message, counter_party, keys_id)?;
544 contract.validate()?;
545
546 if self.store.get_contract(&contract.id).await?.is_some() {
547 return Err(Error::InvalidParameters(
548 "Contract with identical id already exists".to_string(),
549 ));
550 }
551
552 self.store.create_contract(&contract).await?;
553 log_info!(
554 self.logger,
555 "Created and stored the offered contract. temp_id={}",
556 contract.id.to_lower_hex_string(),
557 );
558 Ok(())
559 }
560
561 #[tracing::instrument(skip_all)]
563 pub async fn on_close_message(
564 &self,
565 close_msg: &CloseDlc,
566 counter_party: &PublicKey,
567 ) -> Result<(), Error> {
568 let signed_contract = get_contract_in_state!(
570 self,
571 &close_msg.contract_id,
572 Confirmed,
573 Some(*counter_party)
574 )?;
575
576 let _close_tx = crate::contract_updater::complete_cooperative_close(
579 &self.secp,
580 &signed_contract,
581 close_msg,
582 &self.signer_provider,
583 &self.logger,
584 )?;
585
586 Ok(())
589 }
590
591 #[tracing::instrument(skip_all)]
593 pub async fn on_accept_message(
594 &self,
595 accept_msg: &AcceptDlc,
596 counter_party: &PublicKey,
597 ) -> Result<DlcMessage, Error> {
598 let offered_contract = get_contract_in_state!(
599 self,
600 &accept_msg.temporary_contract_id,
601 Offered,
602 Some(*counter_party)
603 )?;
604
605 let (signed_contract, signed_msg) = match verify_accepted_and_sign_contract(
606 &self.secp,
607 &offered_contract,
608 accept_msg,
609 &self.wallet,
610 &self.signer_provider,
611 &self.store,
612 &self.logger,
613 )
614 .await
615 {
616 Ok(contract) => contract,
617 Err(e) => {
618 log_error!(
619 self.logger,
620 "Error in on_accept_message. tmp_contract_id={} error={}",
621 offered_contract.id.to_lower_hex_string(),
622 e.to_string()
623 );
624 return self
625 .accept_fail_on_error(offered_contract, accept_msg.clone(), e)
626 .await;
627 }
628 };
629
630 log_info!(
631 self.logger,
632 "Verified the accept message and signed the contract. temp_id={} contract_id={}",
633 offered_contract.id.to_lower_hex_string(),
634 signed_contract.accepted_contract.get_contract_id_string(),
635 );
636
637 let contract_id = signed_contract.accepted_contract.get_contract_id_string();
638
639 self.wallet.import_address(&Address::p2wsh(
640 &signed_contract
641 .accepted_contract
642 .dlc_transactions
643 .funding_script_pubkey,
644 self.blockchain.get_network()?,
645 ))?;
646
647 self.store
648 .update_contract(&Contract::Signed(signed_contract))
649 .await?;
650
651 log_info!(
652 self.logger,
653 "Accepted and signed the contract. temp_id={} contract_id={}",
654 offered_contract.id.to_lower_hex_string(),
655 contract_id,
656 );
657
658 Ok(DlcMessage::Sign(signed_msg))
659 }
660
661 #[tracing::instrument(skip_all)]
663 pub async fn on_sign_message(
664 &self,
665 sign_message: &SignDlc,
666 peer_id: &PublicKey,
667 ) -> Result<(), Error> {
668 let accepted_contract =
669 get_contract_in_state!(self, &sign_message.contract_id, Accepted, Some(*peer_id))?;
670 let (signed_contract, fund_tx) = match crate::contract_updater::verify_signed_contract(
671 &self.secp,
672 &accepted_contract,
673 sign_message,
674 &self.wallet,
675 &self.store,
676 &self.signer_provider,
677 &self.logger,
678 )
679 .await
680 {
681 Ok(contract) => contract,
682 Err(e) => {
683 log_error!(
684 self.logger,
685 "Error in on_sign_message. contract_id={} error={}",
686 accepted_contract.get_contract_id_string(),
687 e.to_string()
688 );
689 return self
690 .sign_fail_on_error(accepted_contract, sign_message.clone(), e)
691 .await;
692 }
693 };
694
695 self.store
696 .update_contract(&Contract::Signed(signed_contract.clone()))
697 .await?;
698
699 self.blockchain.send_transaction(&fund_tx).await?;
700
701 if accepted_contract
704 .offered_contract
705 .funding_inputs
706 .iter()
707 .any(|c| c.dlc_input.is_some())
708 {
709 let Some(dlc_input) = accepted_contract
710 .offered_contract
711 .funding_inputs
712 .iter()
713 .find(|c| c.dlc_input.is_some())
714 else {
715 return Ok(());
716 };
717 let preclosed_contract = get_contract_in_state!(
718 self,
719 &dlc_input.dlc_input.as_ref().unwrap().contract_id,
720 Confirmed,
721 None as Option<PublicKey>
722 )?;
723
724 let preclosed_contract = PreClosedContract {
725 signed_contract: preclosed_contract.clone(),
726 attestations: None,
727 signed_cet: signed_contract
728 .accepted_contract
729 .dlc_transactions
730 .fund
731 .clone(),
732 };
733
734 log_debug!(self.logger,
735 "Contract contains a DLC input. Marking the previous contract as pre-closed. dlc_input_contract_id={}",
736 preclosed_contract.signed_contract.accepted_contract.get_contract_id_string()
737 );
738
739 self.store
740 .update_contract(&Contract::PreClosed(preclosed_contract.clone()))
741 .await?;
742 }
743
744 Ok(())
745 }
746
747 #[tracing::instrument(skip_all, level = "debug")]
748 async fn get_oracle_announcements(
749 &self,
750 oracle_inputs: &OracleInput,
751 ) -> Result<Vec<OracleAnnouncement>, Error> {
752 let mut announcements = Vec::new();
753 for pubkey in &oracle_inputs.public_keys {
754 let oracle = self
755 .oracles
756 .get(pubkey)
757 .ok_or_else(|| Error::InvalidParameters("Unknown oracle public key".to_string()))?;
758 let announcement = oracle.get_announcement(&oracle_inputs.event_id).await?;
759 announcements.push(announcement);
760 }
761
762 Ok(announcements)
763 }
764
765 async fn sign_fail_on_error<R>(
766 &self,
767 accepted_contract: AcceptedContract,
768 sign_message: SignDlc,
769 e: Error,
770 ) -> Result<R, Error> {
771 log_error!(
772 self.logger,
773 "Error in on_sign_message marking contract as failed sign. contract_id={} error={}",
774 accepted_contract.get_contract_id_string(),
775 e.to_string()
776 );
777 self.store
778 .update_contract(&Contract::FailedSign(FailedSignContract {
779 accepted_contract,
780 sign_message,
781 error_message: e.to_string(),
782 }))
783 .await?;
784 Err(e)
785 }
786
787 async fn accept_fail_on_error<R>(
788 &self,
789 offered_contract: OfferedContract,
790 accept_message: AcceptDlc,
791 e: Error,
792 ) -> Result<R, Error> {
793 log_error!(
794 self.logger,
795 "Error in on_accept_message marking contract as failed accept. contract_id={} error={}",
796 offered_contract.id.to_lower_hex_string(),
797 e.to_string()
798 );
799 self.store
800 .update_contract(&Contract::FailedAccept(FailedAcceptContract {
801 offered_contract,
802 accept_message,
803 error_message: e.to_string(),
804 }))
805 .await?;
806 Err(e)
807 }
808
809 #[tracing::instrument(skip_all, level = "debug")]
810 async fn check_signed_contract(&self, contract: &SignedContract) -> Result<(), Error> {
811 let confirmations = self
812 .blockchain
813 .get_transaction_confirmations(
814 &contract
815 .accepted_contract
816 .dlc_transactions
817 .fund
818 .compute_txid(),
819 )
820 .await?;
821 if confirmations >= NB_CONFIRMATIONS {
822 log_info!(
823 self.logger,
824 "Marking signed contract as confirmed. confirmations={} contract_id={}",
825 confirmations,
826 contract.accepted_contract.get_contract_id_string(),
827 );
828 self.store
829 .update_contract(&Contract::Confirmed(contract.clone()))
830 .await?;
831 } else {
832 log_debug!(self.logger,
833 "Not enough confirmations to mark contract as confirmed. confirmations={} required={} contract_id={}",
834 confirmations,
835 NB_CONFIRMATIONS,
836 contract.accepted_contract.get_contract_id_string(),
837 );
838 }
839 Ok(())
840 }
841
842 #[tracing::instrument(skip_all, level = "debug")]
843 async fn check_signed_contracts(
844 &self,
845 signed_contracts: &[SignedContract],
846 ) -> Result<(), Error> {
847 for c in signed_contracts {
848 if let Err(e) = self.check_signed_contract(c).await {
849 log_error!(
850 self.logger,
851 "Error checking signed contract. contract_id={} error={}",
852 c.accepted_contract.get_contract_id_string(),
853 e.to_string()
854 )
855 }
856 }
857
858 Ok(())
859 }
860
861 #[tracing::instrument(skip_all, level = "debug")]
862 async fn check_for_spliced_contract(&self) -> Result<Vec<SignedContract>, Error> {
863 let contracts = self.get_store().get_signed_contracts().await?;
864 for contract in &contracts {
865 let dlc_input = contract
866 .accepted_contract
867 .offered_contract
868 .funding_inputs
869 .iter()
870 .find(|d| d.dlc_input.is_some());
871
872 let Some(funding_input) = dlc_input else {
873 continue;
874 };
875
876 let contract_id = funding_input.dlc_input.as_ref().unwrap().contract_id;
877
878 let confirmed_contract_in_splice = match get_contract_in_state!(
879 self,
880 &contract_id,
881 Confirmed,
882 None as Option<PublicKey>
883 ) {
884 Ok(c) => Ok(c),
885 Err(Error::InvalidState(e)) => {
886 log_trace!(self.logger,
887 "The previouse contract referenced in a splice transaction is in an unexpected state. contract_id={} error={}",
888 contract_id.to_lower_hex_string(), e.to_string(),
889 );
890 continue;
891 }
892 Err(e) => {
893 log_debug!(self.logger,
894 "The previouse contract referenced in a splice transaction failed to retrieve. contract_id={} error={}",
895 contract_id.to_lower_hex_string(), e.to_string(),
896 );
897 Err(e)
898 }
899 }?;
900
901 let preclosed_contract = PreClosedContract {
902 signed_contract: confirmed_contract_in_splice.clone(),
903 attestations: None,
904 signed_cet: contract.accepted_contract.dlc_transactions.fund.clone(),
905 };
906
907 log_debug!(self.logger,
908 "The previous contract that was spliced is now closed because the splice contract is confirmed. contract_id={}",
909 contract_id.to_lower_hex_string(),
910 );
911
912 self.store
913 .update_contract(&Contract::PreClosed(preclosed_contract.clone()))
914 .await?;
915 }
916 Ok(contracts)
917 }
918
919 #[tracing::instrument(skip_all, level = "debug")]
920 async fn check_confirmed_contracts(&self) -> Result<(), Error> {
921 for c in self.store.get_confirmed_contracts().await? {
922 if c.channel_id.is_some() {
924 continue;
925 }
926 if let Err(e) = self.check_confirmed_contract(&c).await {
927 log_error!(
928 self.logger,
929 "Error checking confirmed contract. contract_id={} error={}",
930 c.accepted_contract.get_contract_id_string(),
931 e.to_string()
932 )
933 }
934 }
935
936 Ok(())
937 }
938
939 #[tracing::instrument(skip_all, level = "debug")]
940 async fn get_closable_contract_info<'a>(
941 &'a self,
942 contract: &'a SignedContract,
943 ) -> ClosableContractInfo<'a> {
944 let contract_infos = &contract.accepted_contract.offered_contract.contract_info;
945 let adaptor_infos = &contract.accepted_contract.adaptor_infos;
946 for (contract_info, adaptor_info) in contract_infos.iter().zip(adaptor_infos.iter()) {
947 log_debug!(
948 self.logger,
949 "Checking contract for oracle maturation. contract_id={}",
950 contract.accepted_contract.get_contract_id_string()
951 );
952 let matured: Vec<_> = contract_info
953 .oracle_announcements
954 .iter()
955 .filter(|x| {
956 (x.oracle_event.event_maturity_epoch as u64) <= self.time.unix_time_now()
957 })
958 .enumerate()
959 .collect();
960 if matured.len() >= contract_info.threshold {
961 let attestations = stream::iter(matured.iter())
962 .map(|(i, announcement)| async move {
963 log_debug!(self.logger,
964 "Oracle announcement for contract is matured. Getting attestations. contract_id={} event_id={}",
965 contract.accepted_contract.get_contract_id_string(),
966 announcement.oracle_event.event_id
967 );
968 let oracle = match self.oracles.get(&announcement.oracle_public_key) {
970 Some(oracle) => oracle,
971 None => {
972 log_debug!(self.logger,
973 "Oracle not found. pubkey={}. contract_id={} event_id={}",
974 announcement.oracle_public_key,
975 contract.accepted_contract.get_contract_id_string(),
976 announcement.oracle_event.event_id
977 );
978 return None;
979 }
980 };
981 let attestation = match oracle
983 .get_attestation(&announcement.oracle_event.event_id)
984 .await
985 {
986 Ok(attestation) => attestation,
987 Err(e) => {
988 log_error!(self.logger,
989 "Attestation not found for event. pubkey={} event_id={} error={}",
990 announcement.oracle_public_key,
991 announcement.oracle_event.event_id,
992 e.to_string()
993 );
994 return None;
995 }
996 };
997 if let Err(e) = attestation.validate(&self.secp, announcement) {
999 log_error!(self.logger,
1000 "Oracle attestation is not valid. pubkey={} event_id={}, error={}",
1001 announcement.oracle_public_key,
1002 announcement.oracle_event.event_id,
1003 e.to_string()
1004 );
1005 return None;
1006 }
1007 log_info!(self.logger,
1008 "Retrieved a valid attestation. pubkey={} event_id={} outcomes={:?}",
1009 announcement.oracle_public_key,
1010 announcement.oracle_event.event_id,
1011 attestation.outcomes
1012 );
1013 Some((*i, attestation))
1014 })
1015 .collect::<FuturesUnordered<_>>()
1016 .await
1017 .filter_map(|result| async move { result }) .collect::<Vec<_>>()
1019 .await;
1020 if attestations.len() >= contract_info.threshold {
1021 log_info!(self.logger,
1022 "Found enough attestations to close contract. contract_id={} attestations={}",
1023 contract.accepted_contract.get_contract_id_string(),
1024 attestations.len()
1025 );
1026 return Some((contract_info, adaptor_info, attestations));
1027 }
1028 }
1029 }
1030 None
1031 }
1032
1033 #[tracing::instrument(skip_all, level = "debug")]
1034 async fn check_confirmed_contract(&self, contract: &SignedContract) -> Result<(), Error> {
1035 log_debug!(
1036 self.logger,
1037 "Checking confirmed contract. contract_id={}",
1038 contract.accepted_contract.get_contract_id_string()
1039 );
1040 let closable_contract_info = self.get_closable_contract_info(contract).await;
1041 if let Some((contract_info, adaptor_info, attestations)) = closable_contract_info {
1042 log_debug!(
1043 self.logger,
1044 "Found closable contract info. contract_id={} attestations={}",
1045 contract.accepted_contract.get_contract_id_string(),
1046 attestations.len()
1047 );
1048 let offer = &contract.accepted_contract.offered_contract;
1049 let signer = self.signer_provider.derive_contract_signer(offer.keys_id)?;
1050
1051 if let Ok(cet) = crate::contract_updater::get_signed_cet(
1057 &self.secp,
1058 contract,
1059 contract_info,
1060 adaptor_info,
1061 &attestations,
1062 &signer,
1063 &self.logger,
1064 ) {
1065 log_info!(
1066 self.logger,
1067 "Found valid CET. Closing contract. contract_id={}",
1068 contract.accepted_contract.get_contract_id_string()
1069 );
1070 match self
1071 .close_contract(
1072 contract,
1073 cet,
1074 attestations.iter().map(|x| x.1.clone()).collect(),
1075 )
1076 .await
1077 {
1078 Ok(closed_contract) => {
1079 log_info!(
1080 self.logger,
1081 "Updated contract to closed. contract_id={}",
1082 contract.accepted_contract.get_contract_id_string()
1083 );
1084 self.store.update_contract(&closed_contract).await?;
1085 return Ok(());
1086 }
1087 Err(e) => {
1088 log_warn!(
1089 self.logger,
1090 "Failed to close contract. contract_id={} error={}",
1091 contract.accepted_contract.get_contract_id_string(),
1092 e.to_string()
1093 );
1094 return Err(e);
1095 }
1096 }
1097 }
1098 }
1099
1100 for pending_close_tx in &contract
1102 .accepted_contract
1103 .dlc_transactions
1104 .pending_close_txs
1105 {
1106 let confirmations = self
1107 .blockchain
1108 .get_transaction_confirmations(&pending_close_tx.compute_txid())
1109 .await?;
1110
1111 log_debug!(
1112 self.logger,
1113 "Checking pending close transaction. close_txid={} contract_id={} confirmations={}",
1114 pending_close_tx.compute_txid().to_string(),
1115 contract.accepted_contract.get_contract_id_string(),
1116 confirmations
1117 );
1118
1119 if confirmations >= NB_CONFIRMATIONS {
1120 log_info!(self.logger,
1122 "Pending close transaction is fully confirmed. Moving to closed. close_txid={} contract_id={}",
1123 pending_close_tx.compute_txid().to_string(),
1124 contract.accepted_contract.get_contract_id_string()
1125 );
1126 let pnl = contract.accepted_contract.compute_pnl(pending_close_tx);
1127 let closed_contract = ClosedContract {
1128 attestations: None, signed_cet: Some(pending_close_tx.clone()), contract_id: contract.accepted_contract.get_contract_id(),
1131 temporary_contract_id: contract.accepted_contract.offered_contract.id,
1132 counter_party_id: contract.accepted_contract.offered_contract.counter_party,
1133 pnl,
1134 funding_txid: contract
1135 .accepted_contract
1136 .dlc_transactions
1137 .fund
1138 .compute_txid(),
1139 };
1140
1141 self.store
1142 .update_contract(&Contract::Closed(closed_contract))
1143 .await?;
1144 break; } else if confirmations >= 1 {
1146 log_debug!(self.logger,
1148 "Found a confirmed but not fully confirmed pending close. Moving to preclosed. close_txid={} contract_id={}",
1149 pending_close_tx.compute_txid().to_string(),
1150 contract.accepted_contract.get_contract_id_string()
1151 );
1152 let preclosed_contract = PreClosedContract {
1153 signed_contract: contract.clone(),
1154 attestations: None, signed_cet: pending_close_tx.clone(),
1156 };
1157
1158 self.store
1159 .update_contract(&Contract::PreClosed(preclosed_contract))
1160 .await?;
1161 break; }
1163 }
1164
1165 self.check_refund(contract).await?;
1166
1167 Ok(())
1168 }
1169
1170 #[tracing::instrument(skip_all, level = "debug")]
1172 pub async fn close_confirmed_contract(
1173 &self,
1174 contract_id: &ContractId,
1175 attestations: Vec<(usize, OracleAttestation)>,
1176 ) -> Result<Contract, Error> {
1177 log_info!(
1178 self.logger,
1179 "Attempting to close confirmed contract manually. contract_id={} outcomes={:?}",
1180 contract_id.to_lower_hex_string(),
1181 attestations
1182 .iter()
1183 .map(|(_, a)| a.outcomes.clone())
1184 .collect::<Vec<_>>()
1185 );
1186 let contract = get_contract_in_state!(self, contract_id, Confirmed, None::<PublicKey>)?;
1187 let contract_infos = &contract.accepted_contract.offered_contract.contract_info;
1188 let adaptor_infos = &contract.accepted_contract.adaptor_infos;
1189
1190 if let Some((contract_info, adaptor_info)) =
1192 contract_infos.iter().zip(adaptor_infos).find(|(c, _)| {
1193 let matches = attestations
1194 .iter()
1195 .filter(|(i, a)| {
1196 c.oracle_announcements[*i].oracle_event.oracle_nonces == a.nonces()
1197 })
1198 .count();
1199
1200 matches >= c.threshold
1201 })
1202 {
1203 let offer = &contract.accepted_contract.offered_contract;
1204 let signer = self.signer_provider.derive_contract_signer(offer.keys_id)?;
1205 log_debug!(
1206 self.logger,
1207 "Getting signed CET. contract_id={}",
1208 contract.accepted_contract.get_contract_id_string()
1209 );
1210 let cet = crate::contract_updater::get_signed_cet(
1211 &self.secp,
1212 &contract,
1213 contract_info,
1214 adaptor_info,
1215 &attestations,
1216 &signer,
1217 &self.logger,
1218 )?;
1219
1220 let time = bitcoin::absolute::Time::from_consensus(self.time.unix_time_now() as u32)
1222 .expect("Time is not in valid range. This should never happen.");
1223 let height =
1224 Height::from_consensus(self.blockchain.get_blockchain_height().await? as u32)
1225 .expect("Height is not in valid range. This should never happen.");
1226 let locktime = cet.lock_time;
1227
1228 if !locktime.is_satisfied_by(height, time) {
1229 return Err(Error::InvalidState(
1230 "CET lock time has not passed yet".to_string(),
1231 ));
1232 }
1233
1234 match self
1235 .close_contract(
1236 &contract,
1237 cet,
1238 attestations.into_iter().map(|x| x.1).collect(),
1239 )
1240 .await
1241 {
1242 Ok(closed_contract) => {
1243 log_info!(
1244 self.logger,
1245 "Closed contract manually. contract_id={}",
1246 contract.accepted_contract.get_contract_id_string()
1247 );
1248 self.store.update_contract(&closed_contract).await?;
1249 Ok(closed_contract)
1250 }
1251 Err(e) => {
1252 log_error!(
1253 self.logger,
1254 "Failed to close contract. contract_id={} error={}",
1255 contract.accepted_contract.get_contract_id_string(),
1256 e.to_string()
1257 );
1258 Err(e)
1259 }
1260 }
1261 } else {
1262 Err(Error::InvalidState(
1263 "Attestations did not match contract infos".to_string(),
1264 ))
1265 }
1266 }
1267
1268 #[tracing::instrument(skip_all, level = "debug")]
1269 async fn check_preclosed_contracts(&self) -> Result<(), Error> {
1270 for c in self.store.get_preclosed_contracts().await? {
1271 if let Err(e) = self.check_preclosed_contract(&c).await {
1272 log_error!(
1273 self.logger,
1274 "Error checking pre-closed contract. contract_id={} error={}",
1275 c.signed_contract.accepted_contract.get_contract_id_string(),
1276 e
1277 )
1278 }
1279 }
1280
1281 Ok(())
1282 }
1283
1284 #[tracing::instrument(skip_all, level = "debug")]
1285 async fn check_preclosed_contract(&self, contract: &PreClosedContract) -> Result<(), Error> {
1286 let broadcasted_txid = contract.signed_cet.compute_txid();
1287 let confirmations = self
1288 .blockchain
1289 .get_transaction_confirmations(&broadcasted_txid)
1290 .await?;
1291 log_debug!(
1292 self.logger,
1293 "Checking pre-closed contract. broadcasted_txid={} contract_id={} confirmations={}",
1294 broadcasted_txid.to_string(),
1295 contract
1296 .signed_contract
1297 .accepted_contract
1298 .get_contract_id_string(),
1299 confirmations
1300 );
1301 if confirmations >= NB_CONFIRMATIONS {
1302 log_debug!(self.logger,
1303 "Pre-closed contract is fully confirmed. Moving to closed. broadcasted_txid={} contract_id={}",
1304 broadcasted_txid.to_string(),
1305 contract.signed_contract.accepted_contract.get_contract_id_string()
1306 );
1307 let pnl = contract
1308 .signed_contract
1309 .accepted_contract
1310 .compute_pnl(&contract.signed_cet);
1311
1312 let closed_contract = ClosedContract {
1313 attestations: contract.attestations.clone(),
1314 signed_cet: Some(contract.signed_cet.clone()),
1315 contract_id: contract.signed_contract.accepted_contract.get_contract_id(),
1316 temporary_contract_id: contract
1317 .signed_contract
1318 .accepted_contract
1319 .offered_contract
1320 .id,
1321 counter_party_id: contract
1322 .signed_contract
1323 .accepted_contract
1324 .offered_contract
1325 .counter_party,
1326 funding_txid: contract
1327 .signed_contract
1328 .accepted_contract
1329 .dlc_transactions
1330 .fund
1331 .compute_txid(),
1332 pnl,
1333 };
1334 self.store
1335 .update_contract(&Contract::Closed(closed_contract))
1336 .await?;
1337 }
1338
1339 Ok(())
1340 }
1341
1342 #[tracing::instrument(skip_all, level = "debug")]
1343 async fn close_contract(
1344 &self,
1345 contract: &SignedContract,
1346 signed_cet: Transaction,
1347 attestations: Vec<OracleAttestation>,
1348 ) -> Result<Contract, Error> {
1349 let confirmations = self
1350 .blockchain
1351 .get_transaction_confirmations(&signed_cet.compute_txid())
1352 .await?;
1353
1354 if confirmations < 1 {
1355 log_info!(
1356 self.logger,
1357 "Broadcasting signed CET. txid={} contract_id={}",
1358 signed_cet.compute_txid().to_string(),
1359 contract.accepted_contract.get_contract_id_string()
1360 );
1361 self.blockchain.send_transaction(&signed_cet).await?;
1366
1367 let preclosed_contract = PreClosedContract {
1368 signed_contract: contract.clone(),
1369 attestations: Some(attestations),
1370 signed_cet,
1371 };
1372
1373 return Ok(Contract::PreClosed(preclosed_contract));
1374 } else if confirmations < NB_CONFIRMATIONS {
1375 log_debug!(self.logger,
1376 "Found a confirmed but not fully confirmed pending close. Moving to preclosed. txid={} contract_id={}",
1377 signed_cet.compute_txid().to_string(),
1378 contract.accepted_contract.get_contract_id_string()
1379 );
1380 let preclosed_contract = PreClosedContract {
1381 signed_contract: contract.clone(),
1382 attestations: Some(attestations),
1383 signed_cet,
1384 };
1385
1386 return Ok(Contract::PreClosed(preclosed_contract));
1387 }
1388
1389 let closed_contract = ClosedContract {
1390 attestations: Some(attestations.to_vec()),
1391 pnl: contract.accepted_contract.compute_pnl(&signed_cet),
1392 signed_cet: Some(signed_cet),
1393 contract_id: contract.accepted_contract.get_contract_id(),
1394 temporary_contract_id: contract.accepted_contract.offered_contract.id,
1395 funding_txid: contract
1396 .accepted_contract
1397 .dlc_transactions
1398 .fund
1399 .compute_txid(),
1400 counter_party_id: contract.accepted_contract.offered_contract.counter_party,
1401 };
1402
1403 Ok(Contract::Closed(closed_contract))
1404 }
1405
1406 #[tracing::instrument(skip_all, level = "debug")]
1408 async fn check_refund(&self, contract: &SignedContract) -> Result<(), Error> {
1409 if contract
1411 .accepted_contract
1412 .dlc_transactions
1413 .refund
1414 .lock_time
1415 .to_consensus_u32() as u64
1416 <= self.time.unix_time_now()
1417 {
1418 log_debug!(self.logger,
1419 "Refund locktime has passed. Attempting to broadcast refund. refund_txid={} contract_id={}",
1420 contract.accepted_contract.dlc_transactions.refund.compute_txid().to_string(),
1421 contract.accepted_contract.get_contract_id_string()
1422 );
1423 let accepted_contract = &contract.accepted_contract;
1424 let refund = accepted_contract.dlc_transactions.refund.clone();
1425 let confirmations = self
1426 .blockchain
1427 .get_transaction_confirmations(&refund.compute_txid())
1428 .await?;
1429 if confirmations == 0 {
1430 log_debug!(self.logger,
1431 "Refund transaction has not been broadcast yet. Sending transaction txid={} contract_id={}",
1432 refund.compute_txid().to_string(),
1433 contract.accepted_contract.get_contract_id_string()
1434 );
1435 let offer = &contract.accepted_contract.offered_contract;
1436 let signer = self.signer_provider.derive_contract_signer(offer.keys_id)?;
1437 let refund = crate::contract_updater::get_signed_refund(
1438 &self.secp,
1439 contract,
1440 &signer,
1441 &self.logger,
1442 )?;
1443 self.blockchain.send_transaction(&refund).await?;
1444 }
1445
1446 self.store
1447 .update_contract(&Contract::Refunded(contract.clone()))
1448 .await?;
1449 }
1450
1451 Ok(())
1452 }
1453
1454 #[tracing::instrument(skip_all, level = "debug")]
1457 pub async fn on_counterparty_close(
1458 &mut self,
1459 contract: &SignedContract,
1460 closing_tx: Transaction,
1461 confirmations: u32,
1462 ) -> Result<Contract, Error> {
1463 if !closing_tx.input.iter().any(|i| {
1465 i.previous_output
1466 == contract
1467 .accepted_contract
1468 .dlc_transactions
1469 .get_fund_outpoint()
1470 }) {
1471 log_error!(
1472 self.logger,
1473 "Closing tx does not spend the funding tx. txid={} contract_id={}",
1474 closing_tx.compute_txid().to_string(),
1475 contract.accepted_contract.get_contract_id_string()
1476 );
1477 return Err(Error::InvalidParameters(
1478 "Closing tx does not spend the funding tx".to_string(),
1479 ));
1480 }
1481
1482 if contract
1484 .accepted_contract
1485 .dlc_transactions
1486 .refund
1487 .compute_txid()
1488 == closing_tx.compute_txid()
1489 {
1490 log_debug!(
1491 self.logger,
1492 "Closing tx is the refund tx. Moving to refunded. txid={} contract_id={}",
1493 closing_tx.compute_txid().to_string(),
1494 contract.accepted_contract.get_contract_id_string()
1495 );
1496 let refunded = Contract::Refunded(contract.clone());
1497 self.store.update_contract(&refunded).await?;
1498 return Ok(refunded);
1499 }
1500
1501 let contract = if confirmations < NB_CONFIRMATIONS {
1502 log_info!(self.logger,
1503 "Closing transaction is not fully confirmed. Moving to preclosed. txid={} contract_id={}",
1504 closing_tx.compute_txid().to_string(),
1505 contract.accepted_contract.get_contract_id_string()
1506 );
1507 Contract::PreClosed(PreClosedContract {
1508 signed_contract: contract.clone(),
1509 attestations: None, signed_cet: closing_tx,
1511 })
1512 } else {
1513 log_info!(
1514 self.logger,
1515 "Closing transaction is fully confirmed. Moving to closed. txid={} contract_id={}",
1516 closing_tx.compute_txid().to_string(),
1517 contract.accepted_contract.get_contract_id_string()
1518 );
1519 Contract::Closed(ClosedContract {
1520 attestations: None, pnl: contract.accepted_contract.compute_pnl(&closing_tx),
1522 signed_cet: Some(closing_tx),
1523 contract_id: contract.accepted_contract.get_contract_id(),
1524 temporary_contract_id: contract.accepted_contract.offered_contract.id,
1525 counter_party_id: contract.accepted_contract.offered_contract.counter_party,
1526 funding_txid: contract
1527 .accepted_contract
1528 .dlc_transactions
1529 .fund
1530 .compute_txid(),
1531 })
1532 };
1533
1534 self.store.update_contract(&contract).await?;
1535
1536 Ok(contract)
1537 }
1538
1539 #[tracing::instrument(skip_all, level = "debug")]
1543 pub async fn cooperative_close_contract(
1544 &self,
1545 contract_id: &ContractId,
1546 counter_payout: Amount,
1547 ) -> Result<(CloseDlc, PublicKey), Error> {
1548 let signed_contract =
1549 get_contract_in_state!(self, contract_id, Confirmed, None as Option<PublicKey>)?;
1550
1551 let (close_message, close_tx) = crate::contract_updater::create_cooperative_close(
1552 &self.secp,
1553 &signed_contract,
1554 counter_payout,
1555 &self.signer_provider,
1556 &self.logger,
1557 )?;
1558
1559 let mut updated_dlc_transactions =
1561 signed_contract.accepted_contract.dlc_transactions.clone();
1562 updated_dlc_transactions
1563 .pending_close_txs
1564 .push(close_tx.clone());
1565 log_debug!(
1566 self.logger,
1567 "Created updated contract with pending close transaction. close_txid={} contract_id={}",
1568 close_tx.compute_txid().to_string(),
1569 contract_id.to_lower_hex_string()
1570 );
1571
1572 let updated_accepted_contract = AcceptedContract {
1573 dlc_transactions: updated_dlc_transactions,
1574 ..signed_contract.accepted_contract.clone()
1575 };
1576
1577 let updated_signed_contract = SignedContract {
1578 accepted_contract: updated_accepted_contract,
1579 ..signed_contract.clone()
1580 };
1581
1582 self.store
1584 .update_contract(&Contract::Confirmed(updated_signed_contract))
1585 .await?;
1586
1587 let counter_party = signed_contract
1588 .accepted_contract
1589 .offered_contract
1590 .counter_party;
1591
1592 Ok((close_message, counter_party))
1593 }
1594
1595 #[tracing::instrument(skip_all, level = "debug")]
1598 pub async fn accept_cooperative_close(
1599 &self,
1600 contract_id: &ContractId,
1601 close_message: &CloseDlc,
1602 ) -> Result<(), Error> {
1603 let signed_contract =
1604 get_contract_in_state!(self, contract_id, Confirmed, None as Option<PublicKey>)?;
1605
1606 let close_tx = crate::contract_updater::complete_cooperative_close(
1607 &self.secp,
1608 &signed_contract,
1609 close_message,
1610 &self.signer_provider,
1611 &self.logger,
1612 )?;
1613
1614 log_debug!(
1615 self.logger,
1616 "Completed cooperative close. close_txid={} contract_id={}",
1617 close_tx.compute_txid().to_string(),
1618 contract_id.to_lower_hex_string()
1619 );
1620 self.blockchain.send_transaction(&close_tx).await?;
1622
1623 let preclosed_contract = PreClosedContract {
1625 signed_contract,
1626 attestations: None,
1627 signed_cet: close_tx,
1628 };
1629
1630 self.store
1631 .update_contract(&Contract::PreClosed(preclosed_contract))
1632 .await?;
1633
1634 Ok(())
1635 }
1636}
1637
1638impl<
1639 W: Deref,
1640 SP: Deref,
1641 B: Deref,
1642 S: Deref,
1643 O: Deref,
1644 T: Deref,
1645 F: Deref,
1646 X: ContractSigner,
1647 L: Deref,
1648 > Manager<W, Arc<CachedContractSignerProvider<SP, X>>, B, S, O, T, F, X, L>
1649where
1650 W::Target: Wallet,
1651 SP::Target: ContractSignerProvider<Signer = X>,
1652 B::Target: Blockchain,
1653 S::Target: Storage,
1654 O::Target: Oracle,
1655 T::Target: Time,
1656 F::Target: FeeEstimator,
1657 L::Target: Logger,
1658{
1659 pub async fn offer_channel(
1662 &self,
1663 contract_input: &ContractInput,
1664 counter_party: PublicKey,
1665 ) -> Result<OfferChannel, Error> {
1666 let oracle_announcements = self.oracle_announcements(contract_input).await?;
1667
1668 let (offered_channel, offered_contract) = crate::channel_updater::offer_channel(
1669 &self.secp,
1670 contract_input,
1671 &counter_party,
1672 &oracle_announcements,
1673 CET_NSEQUENCE,
1674 REFUND_DELAY,
1675 &self.wallet,
1676 &self.signer_provider,
1677 &self.blockchain,
1678 &self.time,
1679 crate::utils::get_new_temporary_id(),
1680 )
1681 .await?;
1682
1683 let msg = offered_channel.get_offer_channel_msg(&offered_contract);
1684
1685 self.store
1686 .upsert_channel(
1687 Channel::Offered(offered_channel),
1688 Some(Contract::Offered(offered_contract)),
1689 )
1690 .await?;
1691
1692 Ok(msg)
1693 }
1694
1695 pub async fn reject_channel(
1698 &self,
1699 channel_id: &ChannelId,
1700 ) -> Result<(Reject, PublicKey), Error> {
1701 let offered_channel =
1702 get_channel_in_state!(self, channel_id, Offered, None as Option<PublicKey>)?;
1703
1704 if offered_channel.is_offer_party {
1705 return Err(Error::InvalidState(
1706 "Cannot reject channel initiated by us.".to_string(),
1707 ));
1708 }
1709
1710 let offered_contract = get_contract_in_state!(
1711 self,
1712 &offered_channel.offered_contract_id,
1713 Offered,
1714 None as Option<PublicKey>
1715 )?;
1716
1717 let counterparty = offered_channel.counter_party;
1718 self.store
1719 .upsert_channel(
1720 Channel::Cancelled(offered_channel),
1721 Some(Contract::Rejected(offered_contract)),
1722 )
1723 .await?;
1724
1725 let msg = Reject {
1726 channel_id: *channel_id,
1727 };
1728 Ok((msg, counterparty))
1729 }
1730
1731 pub async fn accept_channel(
1735 &self,
1736 channel_id: &ChannelId,
1737 ) -> Result<(AcceptChannel, ChannelId, ContractId, PublicKey), Error> {
1738 let offered_channel =
1739 get_channel_in_state!(self, channel_id, Offered, None as Option<PublicKey>)?;
1740
1741 if offered_channel.is_offer_party {
1742 return Err(Error::InvalidState(
1743 "Cannot accept channel initiated by us.".to_string(),
1744 ));
1745 }
1746
1747 let offered_contract = get_contract_in_state!(
1748 self,
1749 &offered_channel.offered_contract_id,
1750 Offered,
1751 None as Option<PublicKey>
1752 )?;
1753
1754 let (accepted_channel, accepted_contract, accept_channel) =
1755 crate::channel_updater::accept_channel_offer(
1756 &self.secp,
1757 &offered_channel,
1758 &offered_contract,
1759 &self.wallet,
1760 &self.signer_provider,
1761 &self.blockchain,
1762 )
1763 .await?;
1764
1765 self.wallet.import_address(&Address::p2wsh(
1766 &accepted_contract.dlc_transactions.funding_script_pubkey,
1767 self.blockchain.get_network()?,
1768 ))?;
1769
1770 let channel_id = accepted_channel.channel_id;
1771 let contract_id = accepted_contract.get_contract_id();
1772 let counter_party = accepted_contract.offered_contract.counter_party;
1773
1774 self.store
1775 .upsert_channel(
1776 Channel::Accepted(accepted_channel),
1777 Some(Contract::Accepted(accepted_contract)),
1778 )
1779 .await?;
1780
1781 Ok((accept_channel, channel_id, contract_id, counter_party))
1782 }
1783
1784 pub async fn force_close_channel(&self, channel_id: &ChannelId) -> Result<(), Error> {
1786 let channel = get_channel_in_state!(self, channel_id, Signed, None as Option<PublicKey>)?;
1787
1788 self.force_close_channel_internal(channel, true).await
1789 }
1790
1791 pub async fn settle_offer(
1795 &self,
1796 channel_id: &ChannelId,
1797 counter_payout: Amount,
1798 ) -> Result<(SettleOffer, PublicKey), Error> {
1799 let mut signed_channel =
1800 get_channel_in_state!(self, channel_id, Signed, None as Option<PublicKey>)?;
1801
1802 let msg = crate::channel_updater::settle_channel_offer(
1803 &self.secp,
1804 &mut signed_channel,
1805 counter_payout,
1806 PEER_TIMEOUT,
1807 &self.signer_provider,
1808 &self.time,
1809 )?;
1810
1811 let counter_party = signed_channel.counter_party;
1812
1813 self.store
1814 .upsert_channel(Channel::Signed(signed_channel), None)
1815 .await?;
1816
1817 Ok((msg, counter_party))
1818 }
1819
1820 pub async fn accept_settle_offer(
1823 &self,
1824 channel_id: &ChannelId,
1825 ) -> Result<(SettleAccept, PublicKey), Error> {
1826 let mut signed_channel =
1827 get_channel_in_state!(self, channel_id, Signed, None as Option<PublicKey>)?;
1828
1829 let msg = crate::channel_updater::settle_channel_accept(
1830 &self.secp,
1831 &mut signed_channel,
1832 CET_NSEQUENCE,
1833 0,
1834 PEER_TIMEOUT,
1835 &self.signer_provider,
1836 &self.time,
1837 &self.chain_monitor,
1838 )
1839 .await?;
1840
1841 let counter_party = signed_channel.counter_party;
1842
1843 self.store
1844 .upsert_channel(Channel::Signed(signed_channel), None)
1845 .await?;
1846
1847 Ok((msg, counter_party))
1848 }
1849
1850 pub async fn renew_offer(
1854 &self,
1855 channel_id: &ChannelId,
1856 counter_payout: Amount,
1857 contract_input: &ContractInput,
1858 ) -> Result<(RenewOffer, PublicKey), Error> {
1859 let mut signed_channel =
1860 get_channel_in_state!(self, channel_id, Signed, None as Option<PublicKey>)?;
1861
1862 let oracle_announcements = self.oracle_announcements(contract_input).await?;
1863
1864 let (msg, offered_contract) = crate::channel_updater::renew_offer(
1865 &self.secp,
1866 &mut signed_channel,
1867 contract_input,
1868 oracle_announcements,
1869 counter_payout,
1870 REFUND_DELAY,
1871 PEER_TIMEOUT,
1872 CET_NSEQUENCE,
1873 &self.signer_provider,
1874 &self.time,
1875 )?;
1876
1877 let counter_party = offered_contract.counter_party;
1878
1879 self.store
1880 .upsert_channel(
1881 Channel::Signed(signed_channel),
1882 Some(Contract::Offered(offered_contract)),
1883 )
1884 .await?;
1885
1886 Ok((msg, counter_party))
1887 }
1888
1889 pub async fn accept_renew_offer(
1893 &self,
1894 channel_id: &ChannelId,
1895 ) -> Result<(RenewAccept, PublicKey), Error> {
1896 let mut signed_channel =
1897 get_channel_in_state!(self, channel_id, Signed, None as Option<PublicKey>)?;
1898 let offered_contract_id = signed_channel.get_contract_id().ok_or_else(|| {
1899 Error::InvalidState("Expected to have a contract id but did not.".to_string())
1900 })?;
1901
1902 let offered_contract = get_contract_in_state!(
1903 self,
1904 &offered_contract_id,
1905 Offered,
1906 None as Option<PublicKey>
1907 )?;
1908
1909 let (accepted_contract, msg) = crate::channel_updater::accept_channel_renewal(
1910 &self.secp,
1911 &mut signed_channel,
1912 &offered_contract,
1913 CET_NSEQUENCE,
1914 PEER_TIMEOUT,
1915 &self.signer_provider,
1916 &self.time,
1917 )?;
1918
1919 let counter_party = signed_channel.counter_party;
1920
1921 self.store
1922 .upsert_channel(
1923 Channel::Signed(signed_channel),
1924 Some(Contract::Accepted(accepted_contract)),
1925 )
1926 .await?;
1927
1928 Ok((msg, counter_party))
1929 }
1930
1931 pub async fn reject_renew_offer(
1935 &self,
1936 channel_id: &ChannelId,
1937 ) -> Result<(Reject, PublicKey), Error> {
1938 let mut signed_channel =
1939 get_channel_in_state!(self, channel_id, Signed, None as Option<PublicKey>)?;
1940 let offered_contract_id = signed_channel.get_contract_id().ok_or_else(|| {
1941 Error::InvalidState(
1942 "Expected to be in a state with an associated contract id but was not.".to_string(),
1943 )
1944 })?;
1945
1946 let offered_contract = get_contract_in_state!(
1947 self,
1948 &offered_contract_id,
1949 Offered,
1950 None as Option<PublicKey>
1951 )?;
1952
1953 let reject_msg = crate::channel_updater::reject_renew_offer(&mut signed_channel)?;
1954
1955 let counter_party = signed_channel.counter_party;
1956
1957 self.store
1958 .upsert_channel(
1959 Channel::Signed(signed_channel),
1960 Some(Contract::Rejected(offered_contract)),
1961 )
1962 .await?;
1963
1964 Ok((reject_msg, counter_party))
1965 }
1966
1967 pub async fn reject_settle_offer(
1971 &self,
1972 channel_id: &ChannelId,
1973 ) -> Result<(Reject, PublicKey), Error> {
1974 let mut signed_channel =
1975 get_channel_in_state!(self, channel_id, Signed, None as Option<PublicKey>)?;
1976
1977 let msg = crate::channel_updater::reject_settle_offer(&mut signed_channel)?;
1978
1979 let counter_party = signed_channel.counter_party;
1980
1981 self.store
1982 .upsert_channel(Channel::Signed(signed_channel), None)
1983 .await?;
1984
1985 Ok((msg, counter_party))
1986 }
1987
1988 pub async fn offer_collaborative_close(
1993 &self,
1994 channel_id: &ChannelId,
1995 counter_payout: Amount,
1996 additional_inputs: Vec<OutPoint>,
1997 ) -> Result<CollaborativeCloseOffer, Error> {
1998 let mut signed_channel =
1999 get_channel_in_state!(self, channel_id, Signed, None as Option<PublicKey>)?;
2000
2001 let (msg, close_tx) = crate::channel_updater::offer_collaborative_close(
2002 &self.secp,
2003 &mut signed_channel,
2004 counter_payout,
2005 additional_inputs,
2006 &self.signer_provider,
2007 &self.time,
2008 )?;
2009
2010 self.chain_monitor.lock().await.add_tx(
2011 close_tx.compute_txid(),
2012 ChannelInfo {
2013 channel_id: *channel_id,
2014 tx_type: TxType::CollaborativeClose,
2015 },
2016 );
2017
2018 self.store
2019 .upsert_channel(Channel::Signed(signed_channel), None)
2020 .await?;
2021 self.store
2022 .persist_chain_monitor(&*self.chain_monitor.lock().await)
2023 .await?;
2024
2025 Ok(msg)
2026 }
2027
2028 pub async fn accept_collaborative_close(&self, channel_id: &ChannelId) -> Result<(), Error> {
2031 let signed_channel =
2032 get_channel_in_state!(self, channel_id, Signed, None as Option<PublicKey>)?;
2033
2034 let closed_contract = if let Some(SignedChannelState::Established {
2035 signed_contract_id,
2036 ..
2037 }) = &signed_channel.roll_back_state
2038 {
2039 let counter_payout = get_signed_channel_state!(
2040 signed_channel,
2041 CollaborativeCloseOffered,
2042 counter_payout
2043 )?;
2044 Some(
2045 self.get_collaboratively_closed_contract(signed_contract_id, *counter_payout, true)
2046 .await?,
2047 )
2048 } else {
2049 None
2050 };
2051
2052 let (close_tx, closed_channel) = crate::channel_updater::accept_collaborative_close_offer(
2053 &self.secp,
2054 &signed_channel,
2055 &self.signer_provider,
2056 )?;
2057
2058 self.blockchain.send_transaction(&close_tx).await?;
2059
2060 self.store.upsert_channel(closed_channel, None).await?;
2061
2062 if let Some(closed_contract) = closed_contract {
2063 self.store
2064 .update_contract(&Contract::Closed(closed_contract))
2065 .await?;
2066 }
2067
2068 Ok(())
2069 }
2070
2071 async fn try_finalize_closing_established_channel(
2072 &self,
2073 signed_channel: SignedChannel,
2074 ) -> Result<(), Error> {
2075 let (buffer_tx, contract_id, &is_initiator) = get_signed_channel_state!(
2076 signed_channel,
2077 Closing,
2078 buffer_transaction,
2079 contract_id,
2080 is_initiator
2081 )?;
2082
2083 if self
2084 .blockchain
2085 .get_transaction_confirmations(&buffer_tx.compute_txid())
2086 .await?
2087 >= CET_NSEQUENCE
2088 {
2089 log_info!(
2090 self.logger,
2091 "Buffer transaction for contract {} has enough confirmations to spend from it",
2092 serialize_hex(&contract_id)
2093 );
2094
2095 let confirmed_contract =
2096 get_contract_in_state!(self, &contract_id, Confirmed, None as Option<PublicKey>)?;
2097
2098 let (contract_info, adaptor_info, attestations) = self
2099 .get_closable_contract_info(&confirmed_contract)
2100 .await
2101 .ok_or_else(|| {
2102 Error::InvalidState("Could not get information to close contract".to_string())
2103 })?;
2104
2105 let (signed_cet, closed_channel) =
2106 crate::channel_updater::finalize_unilateral_close_settled_channel(
2107 &self.secp,
2108 &signed_channel,
2109 &confirmed_contract,
2110 contract_info,
2111 &attestations,
2112 adaptor_info,
2113 &self.signer_provider,
2114 is_initiator,
2115 )?;
2116
2117 let closed_contract = self
2118 .close_contract(
2119 &confirmed_contract,
2120 signed_cet,
2121 attestations.iter().map(|x| &x.1).cloned().collect(),
2122 )
2123 .await?;
2124
2125 self.chain_monitor
2126 .lock()
2127 .await
2128 .cleanup_channel(signed_channel.channel_id);
2129
2130 self.store
2131 .upsert_channel(closed_channel, Some(closed_contract))
2132 .await?;
2133 }
2134
2135 Ok(())
2136 }
2137
2138 async fn on_offer_channel(
2139 &self,
2140 offer_channel: &OfferChannel,
2141 counter_party: PublicKey,
2142 ) -> Result<(), Error> {
2143 offer_channel.validate(
2144 &self.secp,
2145 REFUND_DELAY,
2146 REFUND_DELAY * 2,
2147 CET_NSEQUENCE,
2148 CET_NSEQUENCE * 2,
2149 )?;
2150
2151 let keys_id = self
2152 .signer_provider
2153 .derive_signer_key_id(false, offer_channel.temporary_contract_id);
2154 let (channel, contract) =
2155 OfferedChannel::from_offer_channel(offer_channel, counter_party, keys_id)?;
2156
2157 contract.validate()?;
2158
2159 if self
2160 .store
2161 .get_channel(&channel.temporary_channel_id)
2162 .await?
2163 .is_some()
2164 {
2165 return Err(Error::InvalidParameters(
2166 "Channel with identical idea already in store".to_string(),
2167 ));
2168 }
2169
2170 self.store
2171 .upsert_channel(Channel::Offered(channel), Some(Contract::Offered(contract)))
2172 .await?;
2173
2174 Ok(())
2175 }
2176
2177 async fn on_accept_channel(
2178 &self,
2179 accept_channel: &AcceptChannel,
2180 peer_id: &PublicKey,
2181 ) -> Result<SignChannel, Error> {
2182 let offered_channel = get_channel_in_state!(
2183 self,
2184 &accept_channel.temporary_channel_id,
2185 Offered,
2186 Some(*peer_id)
2187 )?;
2188 let offered_contract = get_contract_in_state!(
2189 self,
2190 &offered_channel.offered_contract_id,
2191 Offered,
2192 Some(*peer_id)
2193 )?;
2194
2195 let (signed_channel, signed_contract, sign_channel) = {
2196 let res = crate::channel_updater::verify_and_sign_accepted_channel(
2197 &self.secp,
2198 &offered_channel,
2199 &offered_contract,
2200 accept_channel,
2201 CET_NSEQUENCE,
2203 &self.wallet,
2204 &self.signer_provider,
2205 &self.store,
2206 &self.chain_monitor,
2207 &self.logger,
2208 )
2209 .await;
2210
2211 match res {
2212 Ok(res) => res,
2213 Err(e) => {
2214 let channel = crate::channel::FailedAccept {
2215 temporary_channel_id: accept_channel.temporary_channel_id,
2216 error_message: format!("Error validating accept channel: {e}"),
2217 accept_message: accept_channel.clone(),
2218 counter_party: *peer_id,
2219 };
2220 self.store
2221 .upsert_channel(Channel::FailedAccept(channel), None)
2222 .await?;
2223 return Err(e);
2224 }
2225 }
2226 };
2227
2228 self.wallet.import_address(&Address::p2wsh(
2229 &signed_contract
2230 .accepted_contract
2231 .dlc_transactions
2232 .funding_script_pubkey,
2233 self.blockchain.get_network()?,
2234 ))?;
2235
2236 if let SignedChannelState::Established {
2237 buffer_transaction, ..
2238 } = &signed_channel.state
2239 {
2240 self.chain_monitor.lock().await.add_tx(
2241 buffer_transaction.compute_txid(),
2242 ChannelInfo {
2243 channel_id: signed_channel.channel_id,
2244 tx_type: TxType::BufferTx,
2245 },
2246 );
2247 } else {
2248 unreachable!();
2249 }
2250
2251 self.store
2252 .upsert_channel(
2253 Channel::Signed(signed_channel),
2254 Some(Contract::Signed(signed_contract)),
2255 )
2256 .await?;
2257
2258 self.store
2259 .persist_chain_monitor(&*self.chain_monitor.lock().await)
2260 .await?;
2261
2262 Ok(sign_channel)
2263 }
2264
2265 async fn on_sign_channel(
2266 &self,
2267 sign_channel: &SignChannel,
2268 peer_id: &PublicKey,
2269 ) -> Result<(), Error> {
2270 let accepted_channel =
2271 get_channel_in_state!(self, &sign_channel.channel_id, Accepted, Some(*peer_id))?;
2272 let accepted_contract = get_contract_in_state!(
2273 self,
2274 &accepted_channel.accepted_contract_id,
2275 Accepted,
2276 Some(*peer_id)
2277 )?;
2278
2279 let (signed_channel, signed_contract, signed_fund_tx) = {
2280 let res = verify_signed_channel(
2281 &self.secp,
2282 &accepted_channel,
2283 &accepted_contract,
2284 sign_channel,
2285 &self.wallet,
2286 &self.chain_monitor,
2287 &self.store,
2288 &self.signer_provider,
2289 &self.logger,
2290 )
2291 .await;
2292
2293 match res {
2294 Ok(res) => res,
2295 Err(e) => {
2296 let channel = crate::channel::FailedSign {
2297 channel_id: sign_channel.channel_id,
2298 error_message: format!("Error validating accept channel: {e}"),
2299 sign_message: sign_channel.clone(),
2300 counter_party: *peer_id,
2301 };
2302 self.store
2303 .upsert_channel(Channel::FailedSign(channel), None)
2304 .await?;
2305 return Err(e);
2306 }
2307 }
2308 };
2309
2310 if let SignedChannelState::Established {
2311 buffer_transaction, ..
2312 } = &signed_channel.state
2313 {
2314 self.chain_monitor.lock().await.add_tx(
2315 buffer_transaction.compute_txid(),
2316 ChannelInfo {
2317 channel_id: signed_channel.channel_id,
2318 tx_type: TxType::BufferTx,
2319 },
2320 );
2321 } else {
2322 unreachable!();
2323 }
2324
2325 self.blockchain.send_transaction(&signed_fund_tx).await?;
2326
2327 self.store
2328 .upsert_channel(
2329 Channel::Signed(signed_channel),
2330 Some(Contract::Signed(signed_contract)),
2331 )
2332 .await?;
2333 self.store
2334 .persist_chain_monitor(&*self.chain_monitor.lock().await)
2335 .await?;
2336
2337 Ok(())
2338 }
2339
2340 async fn on_settle_offer(
2341 &self,
2342 settle_offer: &SettleOffer,
2343 peer_id: &PublicKey,
2344 ) -> Result<Option<Reject>, Error> {
2345 let mut signed_channel =
2346 get_channel_in_state!(self, &settle_offer.channel_id, Signed, Some(*peer_id))?;
2347
2348 if let SignedChannelState::SettledOffered { .. } = signed_channel.state {
2349 return Ok(Some(Reject {
2350 channel_id: settle_offer.channel_id,
2351 }));
2352 }
2353
2354 crate::channel_updater::on_settle_offer(&mut signed_channel, settle_offer)?;
2355
2356 self.store
2357 .upsert_channel(Channel::Signed(signed_channel), None)
2358 .await?;
2359
2360 Ok(None)
2361 }
2362
2363 async fn on_settle_accept(
2364 &self,
2365 settle_accept: &SettleAccept,
2366 peer_id: &PublicKey,
2367 ) -> Result<SettleConfirm, Error> {
2368 let mut signed_channel =
2369 get_channel_in_state!(self, &settle_accept.channel_id, Signed, Some(*peer_id))?;
2370
2371 let msg = crate::channel_updater::settle_channel_confirm(
2372 &self.secp,
2373 &mut signed_channel,
2374 settle_accept,
2375 CET_NSEQUENCE,
2376 0,
2377 PEER_TIMEOUT,
2378 &self.signer_provider,
2379 &self.time,
2380 &self.chain_monitor,
2381 )
2382 .await?;
2383
2384 self.store
2385 .upsert_channel(Channel::Signed(signed_channel), None)
2386 .await?;
2387
2388 Ok(msg)
2389 }
2390
2391 async fn on_settle_confirm(
2392 &self,
2393 settle_confirm: &SettleConfirm,
2394 peer_id: &PublicKey,
2395 ) -> Result<SettleFinalize, Error> {
2396 let mut signed_channel =
2397 get_channel_in_state!(self, &settle_confirm.channel_id, Signed, Some(*peer_id))?;
2398 let &own_payout = get_signed_channel_state!(signed_channel, SettledAccepted, own_payout)?;
2399 let (prev_buffer_tx, own_buffer_adaptor_signature, is_offer, signed_contract_id) = get_signed_channel_rollback_state!(
2400 signed_channel,
2401 Established,
2402 buffer_transaction,
2403 own_buffer_adaptor_signature,
2404 is_offer,
2405 signed_contract_id
2406 )?;
2407
2408 let prev_buffer_txid = prev_buffer_tx.compute_txid();
2409 let own_buffer_adaptor_signature = *own_buffer_adaptor_signature;
2410 let is_offer = *is_offer;
2411 let signed_contract_id = *signed_contract_id;
2412
2413 let msg = crate::channel_updater::settle_channel_finalize(
2414 &self.secp,
2415 &mut signed_channel,
2416 settle_confirm,
2417 &self.signer_provider,
2418 )?;
2419
2420 self.chain_monitor.lock().await.add_tx(
2421 prev_buffer_txid,
2422 ChannelInfo {
2423 channel_id: signed_channel.channel_id,
2424 tx_type: TxType::Revoked {
2425 update_idx: signed_channel.update_idx + 1,
2426 own_adaptor_signature: own_buffer_adaptor_signature,
2427 is_offer,
2428 revoked_tx_type: RevokedTxType::Buffer,
2429 },
2430 },
2431 );
2432
2433 let closed_contract = Contract::Closed(
2434 self.get_collaboratively_closed_contract(&signed_contract_id, own_payout, true)
2435 .await?,
2436 );
2437
2438 self.store
2439 .upsert_channel(Channel::Signed(signed_channel), Some(closed_contract))
2440 .await?;
2441 self.store
2442 .persist_chain_monitor(&*self.chain_monitor.lock().await)
2443 .await?;
2444
2445 Ok(msg)
2446 }
2447
2448 async fn on_settle_finalize(
2449 &self,
2450 settle_finalize: &SettleFinalize,
2451 peer_id: &PublicKey,
2452 ) -> Result<(), Error> {
2453 let mut signed_channel =
2454 get_channel_in_state!(self, &settle_finalize.channel_id, Signed, Some(*peer_id))?;
2455 let &own_payout = get_signed_channel_state!(signed_channel, SettledConfirmed, own_payout)?;
2456 let (buffer_tx, own_buffer_adaptor_signature, is_offer, signed_contract_id) = get_signed_channel_rollback_state!(
2457 signed_channel,
2458 Established,
2459 buffer_transaction,
2460 own_buffer_adaptor_signature,
2461 is_offer,
2462 signed_contract_id
2463 )?;
2464
2465 let own_buffer_adaptor_signature = *own_buffer_adaptor_signature;
2466 let is_offer = *is_offer;
2467 let buffer_txid = buffer_tx.compute_txid();
2468 let signed_contract_id = *signed_contract_id;
2469
2470 crate::channel_updater::settle_channel_on_finalize(
2471 &self.secp,
2472 &mut signed_channel,
2473 settle_finalize,
2474 )?;
2475
2476 self.chain_monitor.lock().await.add_tx(
2477 buffer_txid,
2478 ChannelInfo {
2479 channel_id: signed_channel.channel_id,
2480 tx_type: TxType::Revoked {
2481 update_idx: signed_channel.update_idx + 1,
2482 own_adaptor_signature: own_buffer_adaptor_signature,
2483 is_offer,
2484 revoked_tx_type: RevokedTxType::Buffer,
2485 },
2486 },
2487 );
2488
2489 let closed_contract = Contract::Closed(
2490 self.get_collaboratively_closed_contract(&signed_contract_id, own_payout, true)
2491 .await?,
2492 );
2493 self.store
2494 .upsert_channel(Channel::Signed(signed_channel), Some(closed_contract))
2495 .await?;
2496 self.store
2497 .persist_chain_monitor(&*self.chain_monitor.lock().await)
2498 .await?;
2499
2500 Ok(())
2501 }
2502
2503 async fn on_renew_offer(
2504 &self,
2505 renew_offer: &RenewOffer,
2506 peer_id: &PublicKey,
2507 ) -> Result<Option<Reject>, Error> {
2508 let mut signed_channel =
2509 get_channel_in_state!(self, &renew_offer.channel_id, Signed, Some(*peer_id))?;
2510
2511 if let SignedChannelState::RenewOffered { is_offer, .. } = signed_channel.state {
2513 if is_offer {
2514 return Ok(Some(Reject {
2515 channel_id: renew_offer.channel_id,
2516 }));
2517 }
2518 }
2519
2520 let offered_contract = crate::channel_updater::on_renew_offer(
2521 &mut signed_channel,
2522 renew_offer,
2523 PEER_TIMEOUT,
2524 &self.time,
2525 )?;
2526
2527 self.store.create_contract(&offered_contract).await?;
2528 self.store
2529 .upsert_channel(Channel::Signed(signed_channel), None)
2530 .await?;
2531
2532 Ok(None)
2533 }
2534
2535 async fn on_renew_accept(
2536 &self,
2537 renew_accept: &RenewAccept,
2538 peer_id: &PublicKey,
2539 ) -> Result<RenewConfirm, Error> {
2540 let mut signed_channel =
2541 get_channel_in_state!(self, &renew_accept.channel_id, Signed, Some(*peer_id))?;
2542 let offered_contract_id = signed_channel.get_contract_id().ok_or_else(|| {
2543 Error::InvalidState(
2544 "Expected to be in a state with an associated contract id but was not.".to_string(),
2545 )
2546 })?;
2547
2548 let offered_contract =
2549 get_contract_in_state!(self, &offered_contract_id, Offered, Some(*peer_id))?;
2550
2551 let (signed_contract, msg) = crate::channel_updater::verify_renew_accept_and_confirm(
2552 &self.secp,
2553 renew_accept,
2554 &mut signed_channel,
2555 &offered_contract,
2556 CET_NSEQUENCE,
2557 PEER_TIMEOUT,
2558 &self.wallet,
2559 &self.signer_provider,
2560 &self.time,
2561 &self.store,
2562 &self.logger,
2563 )
2564 .await?;
2565
2566 self.store
2568 .upsert_channel(
2569 Channel::Signed(signed_channel),
2570 Some(Contract::Confirmed(signed_contract)),
2571 )
2572 .await?;
2573
2574 Ok(msg)
2575 }
2576
2577 async fn on_renew_confirm(
2578 &self,
2579 renew_confirm: &RenewConfirm,
2580 peer_id: &PublicKey,
2581 ) -> Result<RenewFinalize, Error> {
2582 let mut signed_channel =
2583 get_channel_in_state!(self, &renew_confirm.channel_id, Signed, Some(*peer_id))?;
2584 let own_payout = get_signed_channel_state!(signed_channel, RenewAccepted, own_payout)?;
2585 let contract_id = signed_channel.get_contract_id().ok_or_else(|| {
2586 Error::InvalidState(
2587 "Expected to be in a state with an associated contract id but was not.".to_string(),
2588 )
2589 })?;
2590
2591 let (tx_type, prev_tx_id, closed_contract) = match signed_channel
2592 .roll_back_state
2593 .as_ref()
2594 .expect("to have a rollback state")
2595 {
2596 SignedChannelState::Established {
2597 own_buffer_adaptor_signature,
2598 buffer_transaction,
2599 signed_contract_id,
2600 ..
2601 } => {
2602 let closed_contract = Contract::Closed(
2603 self.get_collaboratively_closed_contract(signed_contract_id, *own_payout, true)
2604 .await?,
2605 );
2606 (
2607 TxType::Revoked {
2608 update_idx: signed_channel.update_idx,
2609 own_adaptor_signature: *own_buffer_adaptor_signature,
2610 is_offer: false,
2611 revoked_tx_type: RevokedTxType::Buffer,
2612 },
2613 buffer_transaction.compute_txid(),
2614 Some(closed_contract),
2615 )
2616 }
2617 SignedChannelState::Settled {
2618 settle_tx,
2619 own_settle_adaptor_signature,
2620 ..
2621 } => (
2622 TxType::Revoked {
2623 update_idx: signed_channel.update_idx,
2624 own_adaptor_signature: *own_settle_adaptor_signature,
2625 is_offer: false,
2626 revoked_tx_type: RevokedTxType::Settle,
2627 },
2628 settle_tx.compute_txid(),
2629 None,
2630 ),
2631 s => {
2632 return Err(Error::InvalidState(format!(
2633 "Expected rollback state Established or Revoked but found {s:?}"
2634 )))
2635 }
2636 };
2637 let accepted_contract =
2638 get_contract_in_state!(self, &contract_id, Accepted, Some(*peer_id))?;
2639
2640 let (signed_contract, msg) = crate::channel_updater::verify_renew_confirm_and_finalize(
2641 &self.secp,
2642 &mut signed_channel,
2643 &accepted_contract,
2644 renew_confirm,
2645 PEER_TIMEOUT,
2646 &self.time,
2647 &self.wallet,
2648 &self.signer_provider,
2649 &self.chain_monitor,
2650 &self.store,
2651 &self.logger,
2652 )
2653 .await?;
2654
2655 self.chain_monitor.lock().await.add_tx(
2656 prev_tx_id,
2657 ChannelInfo {
2658 channel_id: signed_channel.channel_id,
2659 tx_type,
2660 },
2661 );
2662
2663 self.store
2665 .upsert_channel(
2666 Channel::Signed(signed_channel),
2667 Some(Contract::Confirmed(signed_contract)),
2668 )
2669 .await?;
2670
2671 self.store
2672 .persist_chain_monitor(&*self.chain_monitor.lock().await)
2673 .await?;
2674
2675 if let Some(closed_contract) = closed_contract {
2676 self.store.update_contract(&closed_contract).await?;
2677 }
2678
2679 Ok(msg)
2680 }
2681
2682 async fn on_renew_finalize(
2683 &self,
2684 renew_finalize: &RenewFinalize,
2685 peer_id: &PublicKey,
2686 ) -> Result<RenewRevoke, Error> {
2687 let mut signed_channel =
2688 get_channel_in_state!(self, &renew_finalize.channel_id, Signed, Some(*peer_id))?;
2689 let own_payout = get_signed_channel_state!(signed_channel, RenewConfirmed, own_payout)?;
2690
2691 let (tx_type, prev_tx_id, closed_contract) = match signed_channel
2692 .roll_back_state
2693 .as_ref()
2694 .expect("to have a rollback state")
2695 {
2696 SignedChannelState::Established {
2697 own_buffer_adaptor_signature,
2698 buffer_transaction,
2699 signed_contract_id,
2700 ..
2701 } => {
2702 let closed_contract = self
2703 .get_collaboratively_closed_contract(signed_contract_id, *own_payout, true)
2704 .await?;
2705 (
2706 TxType::Revoked {
2707 update_idx: signed_channel.update_idx,
2708 own_adaptor_signature: *own_buffer_adaptor_signature,
2709 is_offer: false,
2710 revoked_tx_type: RevokedTxType::Buffer,
2711 },
2712 buffer_transaction.compute_txid(),
2713 Some(Contract::Closed(closed_contract)),
2714 )
2715 }
2716 SignedChannelState::Settled {
2717 settle_tx,
2718 own_settle_adaptor_signature,
2719 ..
2720 } => (
2721 TxType::Revoked {
2722 update_idx: signed_channel.update_idx,
2723 own_adaptor_signature: *own_settle_adaptor_signature,
2724 is_offer: false,
2725 revoked_tx_type: RevokedTxType::Settle,
2726 },
2727 settle_tx.compute_txid(),
2728 None,
2729 ),
2730 s => {
2731 return Err(Error::InvalidState(format!(
2732 "Expected rollback state of Established or Settled but was {s:?}"
2733 )))
2734 }
2735 };
2736
2737 let msg = crate::channel_updater::renew_channel_on_finalize(
2738 &self.secp,
2739 &mut signed_channel,
2740 renew_finalize,
2741 &self.signer_provider,
2742 )?;
2743
2744 self.chain_monitor.lock().await.add_tx(
2745 prev_tx_id,
2746 ChannelInfo {
2747 channel_id: signed_channel.channel_id,
2748 tx_type,
2749 },
2750 );
2751
2752 let buffer_tx =
2753 get_signed_channel_state!(signed_channel, Established, ref buffer_transaction)?;
2754
2755 self.chain_monitor.lock().await.add_tx(
2756 buffer_tx.compute_txid(),
2757 ChannelInfo {
2758 channel_id: signed_channel.channel_id,
2759 tx_type: TxType::BufferTx,
2760 },
2761 );
2762
2763 self.store
2764 .upsert_channel(Channel::Signed(signed_channel), None)
2765 .await?;
2766 self.store
2767 .persist_chain_monitor(&*self.chain_monitor.lock().await)
2768 .await?;
2769
2770 if let Some(closed_contract) = closed_contract {
2771 self.store.update_contract(&closed_contract).await?;
2772 }
2773
2774 Ok(msg)
2775 }
2776
2777 async fn on_renew_revoke(
2778 &self,
2779 renew_revoke: &RenewRevoke,
2780 peer_id: &PublicKey,
2781 ) -> Result<(), Error> {
2782 let mut signed_channel =
2783 get_channel_in_state!(self, &renew_revoke.channel_id, Signed, Some(*peer_id))?;
2784
2785 crate::channel_updater::renew_channel_on_revoke(
2786 &self.secp,
2787 &mut signed_channel,
2788 renew_revoke,
2789 )?;
2790
2791 self.store
2792 .upsert_channel(Channel::Signed(signed_channel), None)
2793 .await?;
2794
2795 Ok(())
2796 }
2797
2798 async fn on_collaborative_close_offer(
2799 &self,
2800 close_offer: &CollaborativeCloseOffer,
2801 peer_id: &PublicKey,
2802 ) -> Result<(), Error> {
2803 let mut signed_channel =
2804 get_channel_in_state!(self, &close_offer.channel_id, Signed, Some(*peer_id))?;
2805
2806 crate::channel_updater::on_collaborative_close_offer(
2807 &mut signed_channel,
2808 close_offer,
2809 PEER_TIMEOUT,
2810 &self.time,
2811 )?;
2812
2813 self.store
2814 .upsert_channel(Channel::Signed(signed_channel), None)
2815 .await?;
2816
2817 Ok(())
2818 }
2819
2820 async fn on_reject(&self, reject: &Reject, counter_party: &PublicKey) -> Result<(), Error> {
2821 let channel = self.store.get_channel(&reject.channel_id).await?;
2822
2823 if let Some(channel) = channel {
2824 if channel.get_counter_party_id() != *counter_party {
2825 return Err(Error::InvalidParameters(format!(
2826 "Peer {:02x?} is not involved with {} {:02x?}.",
2827 counter_party,
2828 stringify!(Channel),
2829 channel.get_id()
2830 )));
2831 }
2832 match channel {
2833 Channel::Offered(offered_channel) => {
2834 let offered_contract = get_contract_in_state!(
2835 self,
2836 &offered_channel.offered_contract_id,
2837 Offered,
2838 None as Option<PublicKey>
2839 )?;
2840 let utxos = offered_contract
2841 .funding_inputs
2842 .iter()
2843 .map(|funding_input| {
2844 let txid = Transaction::consensus_decode(
2845 &mut funding_input.prev_tx.as_slice(),
2846 )
2847 .expect("Transaction Decode Error")
2848 .compute_txid();
2849 let vout = funding_input.prev_tx_vout;
2850 OutPoint { txid, vout }
2851 })
2852 .collect::<Vec<_>>();
2853
2854 self.wallet.unreserve_utxos(&utxos)?;
2855
2856 self.store
2858 .upsert_channel(
2859 Channel::Cancelled(offered_channel),
2860 Some(Contract::Rejected(offered_contract)),
2861 )
2862 .await?;
2863 }
2864 Channel::Signed(mut signed_channel) => {
2865 let contract = match signed_channel.state {
2866 SignedChannelState::RenewOffered {
2867 offered_contract_id,
2868 ..
2869 } => {
2870 let offered_contract = get_contract_in_state!(
2871 self,
2872 &offered_contract_id,
2873 Offered,
2874 None::<PublicKey>
2875 )?;
2876 Some(Contract::Rejected(offered_contract))
2877 }
2878 _ => None,
2879 };
2880
2881 crate::channel_updater::on_reject(&mut signed_channel)?;
2882
2883 self.store
2884 .upsert_channel(Channel::Signed(signed_channel), contract)
2885 .await?;
2886 }
2887 channel => {
2888 return Err(Error::InvalidState(format!(
2889 "Not in a state adequate to receive a reject message. {channel:?}"
2890 )))
2891 }
2892 }
2893 } else {
2894 log_warn!(
2895 self.logger,
2896 "Couldn't find rejected dlc channel with id: {}",
2897 reject.channel_id.to_lower_hex_string()
2898 );
2899 }
2900
2901 Ok(())
2902 }
2903
2904 async fn channel_checks(&self) -> Result<(), Error> {
2905 let established_closing_channels = self
2906 .store
2907 .get_signed_channels(Some(SignedChannelStateType::Closing))
2908 .await?;
2909
2910 for channel in established_closing_channels {
2911 if let Err(e) = self.try_finalize_closing_established_channel(channel).await {
2912 log_error!(
2913 self.logger,
2914 "Error trying to close established channel: {}",
2915 e
2916 );
2917 }
2918 }
2919
2920 if let Err(e) = self.check_for_timed_out_channels().await {
2921 log_error!(self.logger, "Error checking timed out channels {}", e);
2922 }
2923 self.check_for_watched_tx().await
2924 }
2925
2926 async fn check_for_timed_out_channels(&self) -> Result<(), Error> {
2927 check_for_timed_out_channels!(self, RenewOffered);
2928 check_for_timed_out_channels!(self, RenewAccepted);
2929 check_for_timed_out_channels!(self, RenewConfirmed);
2930 check_for_timed_out_channels!(self, SettledOffered);
2931 check_for_timed_out_channels!(self, SettledAccepted);
2932 check_for_timed_out_channels!(self, SettledConfirmed);
2933
2934 Ok(())
2935 }
2936
2937 pub(crate) async fn process_watched_txs(
2938 &self,
2939 watched_txs: Vec<(Transaction, ChannelInfo)>,
2940 ) -> Result<(), Error> {
2941 for (tx, channel_info) in watched_txs {
2942 let mut signed_channel = match get_channel_in_state!(
2943 self,
2944 &channel_info.channel_id,
2945 Signed,
2946 None as Option<PublicKey>
2947 ) {
2948 Ok(c) => c,
2949 Err(e) => {
2950 log_error!(
2951 self.logger,
2952 "Could not retrieve channel {:?}: {}",
2953 channel_info.channel_id,
2954 e
2955 );
2956 continue;
2957 }
2958 };
2959
2960 let persist = match channel_info.tx_type {
2961 TxType::BufferTx => {
2962 let contract_id = signed_channel
2969 .get_contract_id()
2970 .expect("to have a contract id");
2971 let mut state = SignedChannelState::Closing {
2972 buffer_transaction: tx.clone(),
2973 is_initiator: false,
2974 contract_id,
2975 keys_id: signed_channel.keys_id().ok_or_else(|| {
2976 Error::InvalidState("Expected to have keys_id.".to_string())
2977 })?,
2978 };
2979 std::mem::swap(&mut signed_channel.state, &mut state);
2980
2981 signed_channel.roll_back_state = Some(state);
2982
2983 self.store
2984 .upsert_channel(Channel::Signed(signed_channel), None)
2985 .await?;
2986
2987 false
2988 }
2989 TxType::Revoked {
2990 update_idx,
2991 own_adaptor_signature,
2992 is_offer,
2993 revoked_tx_type,
2994 } => {
2995 let secret = signed_channel
2996 .counter_party_commitment_secrets
2997 .get_secret(update_idx)
2998 .expect("to be able to retrieve the per update secret");
2999 let counter_per_update_secret = SecretKey::from_slice(&secret)
3000 .expect("to be able to parse the counter per update secret.");
3001
3002 let per_update_seed_pk = signed_channel.own_per_update_seed;
3003
3004 let per_update_seed_sk = self
3005 .signer_provider
3006 .get_secret_key_for_pubkey(&per_update_seed_pk)?;
3007
3008 let per_update_secret = SecretKey::from_slice(&build_commitment_secret(
3009 per_update_seed_sk.as_ref(),
3010 update_idx,
3011 ))
3012 .expect("a valid secret key.");
3013
3014 let per_update_point =
3015 PublicKey::from_secret_key(&self.secp, &per_update_secret);
3016
3017 let own_revocation_params = signed_channel.own_points.get_revokable_params(
3018 &self.secp,
3019 &signed_channel.counter_points.revocation_basepoint,
3020 &per_update_point,
3021 );
3022
3023 let counter_per_update_point =
3024 PublicKey::from_secret_key(&self.secp, &counter_per_update_secret);
3025
3026 let base_own_sk = self
3027 .signer_provider
3028 .get_secret_key_for_pubkey(&signed_channel.own_points.own_basepoint)?;
3029
3030 let own_sk = derive_private_key(&self.secp, &per_update_point, &base_own_sk);
3031
3032 let counter_revocation_params =
3033 signed_channel.counter_points.get_revokable_params(
3034 &self.secp,
3035 &signed_channel.own_points.revocation_basepoint,
3036 &counter_per_update_point,
3037 );
3038
3039 let witness = if signed_channel.own_params.fund_pubkey
3040 < signed_channel.counter_params.fund_pubkey
3041 {
3042 tx.input[0].witness.to_vec().remove(1)
3043 } else {
3044 tx.input[0].witness.to_vec().remove(2)
3045 };
3046
3047 let sig_data = witness
3048 .iter()
3049 .take(witness.len() - 1)
3050 .cloned()
3051 .collect::<Vec<_>>();
3052 let own_sig = Signature::from_der(&sig_data)?;
3053
3054 let counter_sk = own_adaptor_signature.recover(
3055 &self.secp,
3056 &own_sig,
3057 &counter_revocation_params.publish_pk.inner,
3058 )?;
3059
3060 let own_revocation_base_secret =
3061 &self.signer_provider.get_secret_key_for_pubkey(
3062 &signed_channel.own_points.revocation_basepoint,
3063 )?;
3064
3065 let counter_revocation_sk = derive_private_revocation_key(
3066 &self.secp,
3067 &counter_per_update_secret,
3068 own_revocation_base_secret,
3069 );
3070
3071 let (offer_params, accept_params) = if is_offer {
3072 (&own_revocation_params, &counter_revocation_params)
3073 } else {
3074 (&counter_revocation_params, &own_revocation_params)
3075 };
3076
3077 let fee_rate_per_vb: u64 = (self.fee_estimator.get_est_sat_per_1000_weight(
3078 lightning::chain::chaininterface::ConfirmationTarget::UrgentOnChainSweep,
3079 ) / 250)
3080 .into();
3081
3082 let signed_tx = match revoked_tx_type {
3083 RevokedTxType::Buffer => {
3084 ddk_dlc::channel::create_and_sign_punish_buffer_transaction(
3085 &self.secp,
3086 offer_params,
3087 accept_params,
3088 &own_sk,
3089 &counter_sk,
3090 &counter_revocation_sk,
3091 &tx,
3092 &self.wallet.get_new_address().await?,
3093 0,
3094 fee_rate_per_vb,
3095 )?
3096 }
3097 RevokedTxType::Settle => {
3098 ddk_dlc::channel::create_and_sign_punish_settle_transaction(
3099 &self.secp,
3100 offer_params,
3101 accept_params,
3102 &own_sk,
3103 &counter_sk,
3104 &counter_revocation_sk,
3105 &tx,
3106 &self.wallet.get_new_address().await?,
3107 CET_NSEQUENCE,
3108 0,
3109 fee_rate_per_vb,
3110 is_offer,
3111 )?
3112 }
3113 };
3114
3115 self.blockchain.send_transaction(&signed_tx).await?;
3116
3117 let closed_channel = Channel::ClosedPunished(ClosedPunishedChannel {
3118 counter_party: signed_channel.counter_party,
3119 temporary_channel_id: signed_channel.temporary_channel_id,
3120 channel_id: signed_channel.channel_id,
3121 punish_txid: signed_tx.compute_txid(),
3122 });
3123
3124 self.chain_monitor
3127 .lock()
3128 .await
3129 .cleanup_channel(signed_channel.channel_id);
3130 self.store.upsert_channel(closed_channel, None).await?;
3131 true
3132 }
3133 TxType::CollaborativeClose => {
3134 if let Some(SignedChannelState::Established {
3135 signed_contract_id, ..
3136 }) = signed_channel.roll_back_state
3137 {
3138 let counter_payout = get_signed_channel_state!(
3139 signed_channel,
3140 CollaborativeCloseOffered,
3141 counter_payout
3142 )?;
3143 let closed_contract = self
3144 .get_collaboratively_closed_contract(
3145 &signed_contract_id,
3146 *counter_payout,
3147 false,
3148 )
3149 .await?;
3150 self.store
3151 .update_contract(&Contract::Closed(closed_contract))
3152 .await?;
3153 }
3154 let closed_channel = Channel::CollaborativelyClosed(ClosedChannel {
3155 counter_party: signed_channel.counter_party,
3156 temporary_channel_id: signed_channel.temporary_channel_id,
3157 channel_id: signed_channel.channel_id,
3158 });
3159 self.chain_monitor
3160 .lock()
3161 .await
3162 .cleanup_channel(signed_channel.channel_id);
3163 self.store.upsert_channel(closed_channel, None).await?;
3164 true
3165 }
3166 TxType::SettleTx => {
3167 let closed_channel = Channel::CounterClosed(ClosedChannel {
3168 counter_party: signed_channel.counter_party,
3169 temporary_channel_id: signed_channel.temporary_channel_id,
3170 channel_id: signed_channel.channel_id,
3171 });
3172 self.chain_monitor
3173 .lock()
3174 .await
3175 .cleanup_channel(signed_channel.channel_id);
3176 self.store.upsert_channel(closed_channel, None).await?;
3177 true
3178 }
3179 TxType::Cet => {
3180 let contract_id = signed_channel.get_contract_id();
3181 let closed_channel = {
3182 match &signed_channel.state {
3183 SignedChannelState::Closing { is_initiator, .. } => {
3184 if *is_initiator {
3185 Channel::Closed(ClosedChannel {
3186 counter_party: signed_channel.counter_party,
3187 temporary_channel_id: signed_channel.temporary_channel_id,
3188 channel_id: signed_channel.channel_id,
3189 })
3190 } else {
3191 Channel::CounterClosed(ClosedChannel {
3192 counter_party: signed_channel.counter_party,
3193 temporary_channel_id: signed_channel.temporary_channel_id,
3194 channel_id: signed_channel.channel_id,
3195 })
3196 }
3197 }
3198 _ => {
3199 log_error!(self.logger, "Saw spending of buffer transaction without being in closing state");
3200 Channel::Closed(ClosedChannel {
3201 counter_party: signed_channel.counter_party,
3202 temporary_channel_id: signed_channel.temporary_channel_id,
3203 channel_id: signed_channel.channel_id,
3204 })
3205 }
3206 }
3207 };
3208
3209 self.chain_monitor
3210 .lock()
3211 .await
3212 .cleanup_channel(signed_channel.channel_id);
3213 let pre_closed_contract = futures::stream::iter(contract_id)
3214 .then(|contract_id| {
3215 let txn = tx.clone();
3216 async move {
3217 self.store.get_contract(&contract_id).await.map(|contract| {
3218 contract.and_then(|contract| match contract {
3219 Contract::Confirmed(signed_contract) => {
3220 Some(Contract::PreClosed(PreClosedContract {
3221 signed_contract,
3222 attestations: None,
3223 signed_cet: txn,
3224 }))
3225 }
3226 _ => None,
3227 })
3228 })
3229 }
3230 })
3231 .try_collect::<Vec<_>>()
3232 .await?
3233 .into_iter()
3234 .flatten()
3235 .next();
3236
3237 self.store
3238 .upsert_channel(closed_channel, pre_closed_contract)
3239 .await?;
3240
3241 true
3242 }
3243 };
3244
3245 if persist {
3246 self.store
3247 .persist_chain_monitor(&*self.chain_monitor.lock().await)
3248 .await?;
3249 }
3250 }
3251 Ok(())
3252 }
3253
3254 async fn check_for_watched_tx(&self) -> Result<(), Error> {
3255 let confirmed_txs = self.chain_monitor.lock().await.confirmed_txs();
3256
3257 self.process_watched_txs(confirmed_txs).await?;
3258
3259 self.store
3260 .persist_chain_monitor(&*self.chain_monitor.lock().await)
3261 .await?;
3262
3263 Ok(())
3264 }
3265
3266 async fn force_close_channel_internal(
3267 &self,
3268 mut channel: SignedChannel,
3269 is_initiator: bool,
3270 ) -> Result<(), Error> {
3271 match &channel.state {
3272 SignedChannelState::Established {
3273 counter_buffer_adaptor_signature,
3274 buffer_transaction,
3275 ..
3276 } => {
3277 let counter_buffer_adaptor_signature = *counter_buffer_adaptor_signature;
3278 let buffer_transaction = buffer_transaction.clone();
3279 self.initiate_unilateral_close_established_channel(
3280 channel,
3281 is_initiator,
3282 counter_buffer_adaptor_signature,
3283 buffer_transaction,
3284 )
3285 .await
3286 }
3287 SignedChannelState::RenewFinalized {
3288 buffer_transaction,
3289 offer_buffer_adaptor_signature,
3290 ..
3291 } => {
3292 let offer_buffer_adaptor_signature = *offer_buffer_adaptor_signature;
3293 let buffer_transaction = buffer_transaction.clone();
3294 self.initiate_unilateral_close_established_channel(
3295 channel,
3296 is_initiator,
3297 offer_buffer_adaptor_signature,
3298 buffer_transaction,
3299 )
3300 .await
3301 }
3302 SignedChannelState::Settled { .. } => {
3303 self.close_settled_channel(channel, is_initiator).await
3304 }
3305 SignedChannelState::SettledOffered { .. }
3306 | SignedChannelState::SettledReceived { .. }
3307 | SignedChannelState::SettledAccepted { .. }
3308 | SignedChannelState::SettledConfirmed { .. }
3309 | SignedChannelState::RenewOffered { .. }
3310 | SignedChannelState::RenewAccepted { .. }
3311 | SignedChannelState::RenewConfirmed { .. }
3312 | SignedChannelState::CollaborativeCloseOffered { .. } => {
3313 channel.state = channel
3314 .roll_back_state
3315 .take()
3316 .expect("to have a rollback state");
3317 let channel_clone = channel.clone(); Box::pin(self.force_close_channel_internal(channel_clone, is_initiator)).await
3319 }
3320 SignedChannelState::Closing { .. } => Err(Error::InvalidState(
3321 "Channel is already closing.".to_string(),
3322 )),
3323 }
3324 }
3325
3326 async fn initiate_unilateral_close_established_channel(
3328 &self,
3329 mut signed_channel: SignedChannel,
3330 is_initiator: bool,
3331 buffer_adaptor_signature: EcdsaAdaptorSignature,
3332 buffer_transaction: Transaction,
3333 ) -> Result<(), Error> {
3334 let keys_id = signed_channel.keys_id().ok_or_else(|| {
3335 Error::InvalidState("Expected to be in a state with an associated keys id.".to_string())
3336 })?;
3337
3338 crate::channel_updater::initiate_unilateral_close_established_channel(
3339 &self.secp,
3340 &mut signed_channel,
3341 buffer_adaptor_signature,
3342 keys_id,
3343 buffer_transaction,
3344 &self.signer_provider,
3345 is_initiator,
3346 )?;
3347
3348 let buffer_transaction =
3349 get_signed_channel_state!(signed_channel, Closing, ref buffer_transaction)?;
3350
3351 self.blockchain.send_transaction(buffer_transaction).await?;
3352
3353 self.chain_monitor
3354 .lock()
3355 .await
3356 .remove_tx(&buffer_transaction.compute_txid());
3357
3358 self.store
3359 .upsert_channel(Channel::Signed(signed_channel), None)
3360 .await?;
3361
3362 self.store
3363 .persist_chain_monitor(&*self.chain_monitor.lock().await)
3364 .await?;
3365
3366 Ok(())
3367 }
3368
3369 async fn close_settled_channel(
3371 &self,
3372 signed_channel: SignedChannel,
3373 is_initiator: bool,
3374 ) -> Result<(), Error> {
3375 let (settle_tx, closed_channel) = crate::channel_updater::close_settled_channel(
3376 &self.secp,
3377 &signed_channel,
3378 &self.signer_provider,
3379 is_initiator,
3380 )?;
3381
3382 if self
3383 .blockchain
3384 .get_transaction_confirmations(&settle_tx.compute_txid())
3385 .await
3386 .unwrap_or(0)
3387 == 0
3388 {
3389 self.blockchain.send_transaction(&settle_tx).await?;
3390 }
3391
3392 self.chain_monitor
3393 .lock()
3394 .await
3395 .cleanup_channel(signed_channel.channel_id);
3396
3397 self.store.upsert_channel(closed_channel, None).await?;
3398
3399 Ok(())
3400 }
3401
3402 async fn get_collaboratively_closed_contract(
3403 &self,
3404 contract_id: &ContractId,
3405 payout: Amount,
3406 is_own_payout: bool,
3407 ) -> Result<ClosedContract, Error> {
3408 let contract = get_contract_in_state!(self, contract_id, Confirmed, None::<PublicKey>)?;
3409 let own_collateral = if contract.accepted_contract.offered_contract.is_offer_party {
3410 contract
3411 .accepted_contract
3412 .offered_contract
3413 .offer_params
3414 .collateral
3415 } else {
3416 contract.accepted_contract.accept_params.collateral
3417 };
3418 let own_payout = if is_own_payout {
3419 payout
3420 } else {
3421 contract.accepted_contract.offered_contract.total_collateral - payout
3422 };
3423 let pnl =
3424 SignedAmount::from_sat(own_payout.to_sat() as i64 - own_collateral.to_sat() as i64);
3425 Ok(ClosedContract {
3426 attestations: None,
3427 signed_cet: None,
3428 contract_id: *contract_id,
3429 temporary_contract_id: contract.accepted_contract.offered_contract.id,
3430 counter_party_id: contract.accepted_contract.offered_contract.counter_party,
3431 funding_txid: contract
3432 .accepted_contract
3433 .dlc_transactions
3434 .fund
3435 .compute_txid(),
3436 pnl,
3437 })
3438 }
3439
3440 async fn oracle_announcements(
3441 &self,
3442 contract_input: &ContractInput,
3443 ) -> Result<Vec<Vec<OracleAnnouncement>>, Error> {
3444 let announcements = stream::iter(contract_input.contract_infos.iter())
3445 .map(|x| {
3446 let future = self.get_oracle_announcements(&x.oracles);
3447 async move {
3448 match future.await {
3449 Ok(result) => Ok(result),
3450 Err(e) => {
3451 log_error!(self.logger, "Failed to get oracle announcements: {}", e);
3452 Err(e)
3453 }
3454 }
3455 }
3456 })
3457 .collect::<FuturesUnordered<_>>()
3458 .await
3459 .try_collect::<Vec<_>>()
3460 .await?;
3461 Ok(announcements)
3462 }
3463}