1mod models;
114mod vtxo;
115pub mod bdk;
116pub(crate) mod progress;
117pub(crate) mod transaction_manager;
118
119pub use self::models::{
120 ExitCpfpRequest, ExitTransactionPackage, FeeInfo, RbfRequirement, TransactionInfo,
121 ChildTransactionInfo, ExitError, ExitState, ExitTx, ExitTxStatus, ExitTxOrigin, ExitStartState,
122 ExitProcessingState, ExitAwaitingDeltaState, ExitClaimableState, ExitClaimInProgressState,
123 ExitClaimedState, ExitProgressStatus, ExitTransactionStatus,
124};
125pub use self::vtxo::ExitVtxo;
126
127use std::borrow::Borrow;
128use std::cmp;
129use std::collections::HashMap;
130use std::sync::Arc;
131
132use anyhow::Context;
133use bitcoin::{
134 Address, Amount, FeeRate, Psbt, ScriptBuf, Sequence, Transaction, TxIn, TxOut, Txid, Witness, sighash
135};
136use bitcoin::consensus::Params;
137use log::{error, info, trace, warn};
138
139use ark::{Vtxo, VtxoId};
140use ark::vtxo::Bare;
141use ark::vtxo::policy::signing::VtxoSigner;
142use bitcoin_ext::{BlockHeight, P2TR_DUST};
143
144use crate::Wallet;
145use crate::chain::ChainSource;
146use crate::exit::transaction_manager::ExitTransactionManager;
147use crate::movement::{MovementDestination, MovementStatus, PaymentMethod};
148use crate::movement::manager::MovementManager;
149use crate::movement::update::MovementUpdate;
150
151use crate::persist::BarkPersister;
152use crate::persist::models::StoredExit;
153use crate::psbtext::PsbtInputExt;
154use crate::subsystem::{ExitMovement, Subsystem};
155use crate::vtxo::{VtxoState, VtxoStateKind};
156
157pub(crate) struct ExitInner {
159 tx_manager: ExitTransactionManager,
160 persister: Arc<dyn BarkPersister>,
161 chain_source: Arc<ChainSource>,
162 movement_manager: Arc<MovementManager>,
163
164 exit_vtxos: Vec<ExitVtxo>,
165}
166
167impl ExitInner {
168 async fn start_exit_for_vtxos(
171 &mut self,
172 vtxos: &[impl Borrow<Vtxo<Bare>>],
173 ) -> anyhow::Result<()> {
174 if vtxos.is_empty() {
175 return Ok(());
176 }
177 let tip = self.chain_source.tip().await?;
178 let params = Params::new(self.chain_source.network());
179 for vtxo in vtxos {
180 let vtxo = vtxo.borrow();
181 let vtxo_id = vtxo.id();
182 if self.exit_vtxos.iter().any(|ev| ev.id() == vtxo_id) {
183 warn!("VTXO {} is already in the exit process", vtxo_id);
184 continue;
185 }
186
187 if vtxo.amount() < P2TR_DUST {
189 return Err(ExitError::DustLimit {
190 vtxo: vtxo.amount(),
191 dust: P2TR_DUST,
192 }.into());
193 }
194
195 trace!("Starting exit for VTXO: {}", vtxo_id);
198 let exit = ExitVtxo::new(vtxo, tip);
199 self.persister.store_exit_vtxo_entry(&StoredExit::new(&exit)).await?;
200 self.persister.update_vtxo_state_checked(
201 vtxo_id, VtxoState::Spent, &VtxoStateKind::UNSPENT_STATES,
202 ).await?;
203 self.exit_vtxos.push(exit);
204 trace!("Exit for VTXO started successfully: {}", vtxo_id);
205
206 let balance = -vtxo.amount().to_signed()?;
208 let script_pubkey = vtxo.output_script_pubkey();
209 let payment_method = match Address::from_script(&script_pubkey, ¶ms) {
210 Ok(addr) => PaymentMethod::Bitcoin(addr.into_unchecked()),
211 Err(e) => {
212 warn!("Unable to convert script pubkey to address: {:#}", e);
213 PaymentMethod::OutputScript(script_pubkey)
214 }
215 };
216
217 self.movement_manager.new_finished_movement(
221 Subsystem::EXIT,
222 ExitMovement::Exit.to_string(),
223 MovementStatus::Successful,
224 MovementUpdate::new()
225 .intended_and_effective_balance(balance)
226 .consumed_vtxo(vtxo_id)
227 .sent_to([MovementDestination::new(payment_method, vtxo.amount())]),
228 ).await.context("Failed to register exit movement")?;
229 }
230 Ok(())
231 }
232
233 async fn refresh_tx_state(&mut self) -> anyhow::Result<()> {
235 let mut exit_vtxos = std::mem::take(&mut self.exit_vtxos);
236 for exit in &mut exit_vtxos {
237 if !exit.is_initialized() {
238 match exit.initialize(&mut self.tx_manager, &*self.persister).await {
239 Ok(()) => continue,
240 Err(e) => {
241 error!("Error initializing exit for VTXO {}: {:#}", exit.id(), e);
242 }
243 }
244 }
245 }
246 self.exit_vtxos = exit_vtxos;
247 self.tx_manager.sync().await?;
248 Ok(())
249 }
250
251 async fn sign_exit_claim_inputs(
254 &self,
255 psbt: &mut Psbt,
256 wallet: &Wallet,
257 ) -> anyhow::Result<()> {
258 let prevouts = psbt.inputs.iter()
259 .map(|i| i.witness_utxo.clone().unwrap())
260 .collect::<Vec<_>>();
261
262 let prevouts = sighash::Prevouts::All(&prevouts);
263 let mut shc = sighash::SighashCache::new(&psbt.unsigned_tx);
264
265 let claimable = self.exit_vtxos.iter()
266 .filter(|ev| ev.is_claimable())
267 .map(|e| (e.id(), e))
268 .collect::<HashMap<_, _>>();
269
270 for (i, input) in psbt.inputs.iter_mut().enumerate() {
271 let vtxo = input.get_exit_claim_input();
272
273 if let Some(vtxo) = vtxo {
274 let exit_vtxo = claimable.get(&vtxo.id()).context("vtxo is not claimable yet")?;
275
276 let witness = wallet.sign_input(&vtxo, i, &mut shc, &prevouts).await
277 .map_err(|e| ExitError::ClaimSigningError { error: e.to_string() })?;
278
279 input.final_script_witness = Some(witness);
280 let _ = exit_vtxo;
281 }
282 }
283
284 Ok(())
285 }
286}
287
288pub struct Exit {
291 inner: Arc<tokio::sync::RwLock<ExitInner>>,
292}
293
294impl Exit {
295 pub(crate) async fn new(
296 persister: Arc<dyn BarkPersister>,
297 chain_source: Arc<ChainSource>,
298 movement_manager: Arc<MovementManager>,
299 ) -> anyhow::Result<Exit> {
300 let tx_manager = ExitTransactionManager::new(persister.clone(), chain_source.clone())?;
301 let inner = ExitInner {
302 exit_vtxos: Vec::new(),
303 tx_manager,
304 persister,
305 chain_source,
306 movement_manager,
307 };
308 Ok(Exit { inner: Arc::new(tokio::sync::RwLock::new(inner)) })
309 }
310
311 pub(crate) async fn load(&self) -> anyhow::Result<()> {
312 let mut guard = self.inner.write().await;
313 let inner = &mut *guard;
314 let exit_vtxo_entries = inner.persister.get_exit_vtxo_entries().await?;
315 inner.exit_vtxos.reserve(exit_vtxo_entries.len());
316
317 for entry in exit_vtxo_entries {
318 if let Some(vtxo) = inner.persister.get_wallet_vtxo(entry.vtxo_id).await? {
319 let mut exit = ExitVtxo::from_entry(entry, &vtxo);
320 exit.initialize(&mut inner.tx_manager, &*inner.persister).await?;
321 inner.exit_vtxos.push(exit);
322 } else {
323 error!("VTXO {} is marked for exit but it's missing from the database", entry.vtxo_id);
324 }
325 }
326 Ok(())
327 }
328
329 pub async fn get_exit_status(
336 &self,
337 vtxo_id: VtxoId,
338 include_history: bool,
339 include_transactions: bool,
340 ) -> Result<Option<ExitTransactionStatus>, ExitError> {
341 let guard = self.inner.read().await;
342 match guard.exit_vtxos.iter().find(|ev| ev.id() == vtxo_id) {
343 None => Ok(None),
344 Some(exit) => {
345 let mut txs = Vec::new();
346 if include_transactions {
347 if let Some(txids) = exit.txids() {
348 txs.reserve(txids.len());
349 for txid in txids {
350 txs.push(guard.tx_manager.get_package(*txid)?.read().await.clone());
351 }
352 } else {
353 let exit_vtxo = exit.get_full_vtxo(&*guard.persister).await?;
358 for tx in exit_vtxo.transactions() {
359 txs.push(ExitTransactionPackage {
360 exit: TransactionInfo {
361 txid: tx.tx.compute_txid(),
362 tx: tx.tx,
363 },
364 child: None,
365 })
366 }
367 }
368 }
369 Ok(Some(ExitTransactionStatus {
370 vtxo_id: exit.id(),
371 state: exit.state().clone(),
372 history: if include_history { Some(exit.history().clone()) } else { None },
373 transactions: txs,
374 }))
375 },
376 }
377 }
378
379 pub async fn get_exit_vtxo(&self, vtxo_id: VtxoId) -> Option<ExitVtxo> {
381 let guard = self.inner.read().await;
382 guard.exit_vtxos.iter().find(|ev| ev.id() == vtxo_id).cloned()
383 }
384
385 pub async fn get_exit_vtxos(&self) -> Vec<ExitVtxo> {
387 let guard = self.inner.read().await;
388 guard.exit_vtxos.clone()
389 }
390
391 pub async fn has_pending_exits(&self) -> bool {
393 let guard = self.inner.read().await;
394 guard.exit_vtxos.iter().any(|ev| ev.state().is_pending())
395 }
396
397 pub fn try_pending_total(&self) -> Option<Amount> {
399 self.inner.try_read().ok().map(|guard| {
400 guard.exit_vtxos.iter()
401 .filter_map(|ev| if ev.state().is_pending() { Some(ev.amount()) } else { None })
402 .sum()
403 })
404 }
405
406 pub async fn all_claimable_at_height(&self) -> Option<BlockHeight> {
408 let guard = self.inner.read().await;
409 let mut highest_claimable_height = None;
410 for exit in &guard.exit_vtxos {
411 if matches!(exit.state(), ExitState::Claimed(..)) {
412 continue;
413 }
414 match exit.state().claimable_height() {
415 Some(h) => highest_claimable_height = cmp::max(highest_claimable_height, Some(h)),
416 None => return None,
417 }
418 }
419 highest_claimable_height
420 }
421
422 pub async fn start_exit_for_entire_wallet(&self) -> anyhow::Result<()> {
429 let mut guard = self.inner.write().await;
430 let all_vtxos = guard.persister.get_vtxos_by_state(&VtxoStateKind::UNSPENT_STATES).await?
431 .into_iter().map(|v| v.vtxo);
432
433 let (eligible, dust) = all_vtxos.partition::<Vec<_>, _>(|v| v.amount() >= P2TR_DUST);
435
436 for vtxo in &dust {
438 warn!(
439 "Skipping dust VTXO {}: {} sats is below the dust limit ({} sats).",
440 vtxo.id(), vtxo.amount().to_sat(), P2TR_DUST.to_sat()
441 );
442 }
443
444 if eligible.is_empty() && !dust.is_empty() {
446 warn!(
447 "Exit not started: all {} VTXOs (total {}) are below the dust limit. \
448 To exit and consolidate dust, you need to refresh your VTXOs first \
449 (requires total balance >= {})",
450 dust.len(),
451 dust.iter().map(|v| v.amount()).sum::<Amount>(),
452 P2TR_DUST,
453 );
454 return Ok(());
455 }
456
457 guard.start_exit_for_vtxos(&eligible).await
458 }
459
460 pub async fn start_exit_for_vtxos(
467 &self,
468 vtxos: &[impl Borrow<Vtxo<Bare>>],
469 ) -> anyhow::Result<()> {
470 let mut guard = self.inner.write().await;
471 guard.start_exit_for_vtxos(vtxos).await
472 }
473
474 pub(crate) async fn dangerous_clear_exit(&self) -> anyhow::Result<()> {
478 let mut guard = self.inner.write().await;
479 for exit in &guard.exit_vtxos {
480 guard.persister.remove_exit_vtxo_entry(&exit.id()).await?;
481 }
482 guard.exit_vtxos.clear();
483 Ok(())
484 }
485
486 pub async fn progress_exits(
499 &self,
500 wallet: &Wallet,
501 ) -> anyhow::Result<Option<Vec<ExitProgressStatus>>> {
502 let mut guard = self.inner.write().await;
503 guard.refresh_tx_state().await?;
504 let mut exit_vtxos = std::mem::take(&mut guard.exit_vtxos);
505 let mut exit_statuses = Vec::with_capacity(exit_vtxos.len());
506
507 for ev in exit_vtxos.iter_mut() {
508 if !ev.is_initialized() {
509 warn!("Skipping progress of uninitialized unilateral exit {}", ev.id());
510 continue;
511 }
512
513 info!("Progressing exit for VTXO {}", ev.id());
514 let error = match ev.progress(
515 wallet,
516 &mut guard.tx_manager,
517 true,
518 ).await {
519 Ok(_) => None,
520 Err(e) => {
521 match &e {
522 ExitError::InsufficientConfirmedFunds { .. } => {
523 warn!("Can't progress exit for VTXO {} at this time: {}", ev.id(), e);
524 },
525 _ => {
526 error!("Error progressing exit for VTXO {}: {}", ev.id(), e);
527 }
528 }
529 Some(e)
530 }
531 };
532 if !matches!(ev.state(), ExitState::Claimed(..)) {
533 exit_statuses.push(ExitProgressStatus {
534 vtxo_id: ev.id(),
535 state: ev.state().clone(),
536 error,
537 });
538 }
539 }
540
541 guard.exit_vtxos = exit_vtxos;
542 Ok(Some(exit_statuses))
543 }
544
545 pub async fn sync(
549 &self,
550 wallet: &Wallet,
551 ) -> anyhow::Result<()> {
552 let mut guard = self.inner.write().await;
553 guard.refresh_tx_state().await?;
554 let mut exit_vtxos = std::mem::take(&mut guard.exit_vtxos);
555 for exit in &mut exit_vtxos {
556 if let Err(e) = exit.progress(
557 wallet, &mut guard.tx_manager, false,
558 ).await {
559 error!("Error syncing exit for VTXO {}: {}", exit.id(), e);
560 }
561 }
562 guard.exit_vtxos = exit_vtxos;
563 Ok(())
564 }
565
566
567 pub async fn exits_needing_cpfp(&self) -> Vec<ExitCpfpRequest> {
574 let guard = self.inner.read().await;
575 let mut requests = Vec::new();
576 for ev in &guard.exit_vtxos {
577 let ExitState::Processing(s) = ev.state() else { continue };
578 for tx in &s.transactions {
579 let rbf_requirement = match &tx.status {
580 ExitTxStatus::AwaitingCpfpBroadcast => None,
581 ExitTxStatus::AwaitingConfirmation { origin: ExitTxOrigin::Mempool, .. } => {
582 match guard.tx_manager.get_child_status(tx.txid).await {
587 Ok(Some(c)) => match c.fee_info {
588 Some(fi) => Some(RbfRequirement {
589 min_fee_rate: fi.fee_rate,
590 current_package_fee: fi.total_fee,
591 }),
592 None => continue,
593 },
594 _ => continue,
595 }
596 },
597 _ => continue,
598 };
599 let package = match guard.tx_manager.get_package(tx.txid) {
600 Ok(p) => p,
601 Err(_) => continue,
602 };
603 let exit_tx = package.read().await.exit.tx.clone();
604 requests.push(ExitCpfpRequest {
605 vtxo_id: ev.id(),
606 exit_tx,
607 rbf_requirement,
608 });
609 }
610 }
611 requests
612 }
613
614 pub async fn provide_cpfp_tx(
627 &self,
628 wallet: &Wallet,
629 exit_txid: Txid,
630 child_tx: Transaction,
631 ) -> anyhow::Result<(), ExitError> {
632 let origin = ExitTxOrigin::Wallet { confirmed_in: None };
633 let mut guard = self.inner.write().await;
634 let inner = &mut *guard;
635 inner.tx_manager.set_wallet_child_tx(exit_txid, child_tx, origin).await?;
636
637 let package = inner.tx_manager.get_package(exit_txid)?;
638 let pkg_guard = package.read().await;
639 match inner.tx_manager.broadcast_package(&*pkg_guard).await {
640 Ok(_) => {},
641 Err(ExitError::ExitPackageBroadcastFailure { ref error, .. })
642 if error.is_mempool_conflict() =>
643 {
644 warn!("CPFP broadcast conflict for {}: {} — another CPFP may already be in mempool", exit_txid, error);
645 },
646 Err(e) => return Err(e),
647 }
648 drop(pkg_guard);
649
650 for ev in inner.exit_vtxos.iter_mut() {
651 let ExitState::Processing(s) = ev.state() else { continue };
652 let has_tx = s.transactions.iter().any(|tx| tx.txid == exit_txid);
653 if has_tx {
654 if let Err(e) = ev.progress(wallet, &mut inner.tx_manager, false).await {
655 warn!("Failed to progress exit for {} after CPFP: {}", exit_txid, e);
656 }
657 break;
658 }
659 }
660
661 Ok(())
662 }
663
664 pub async fn list_claimable(&self) -> Vec<ExitVtxo> {
666 let guard = self.inner.read().await;
667 guard.exit_vtxos.iter().filter(|ev| ev.is_claimable()).cloned().collect()
668 }
669
670 pub async fn sign_exit_claim_inputs(&self, psbt: &mut Psbt, wallet: &Wallet) -> anyhow::Result<()> {
678 let guard = self.inner.read().await;
679 guard.sign_exit_claim_inputs(psbt, wallet).await
680 }
681
682 pub async fn drain_exits(
691 &self,
692 inputs: &[impl Borrow<ExitVtxo>],
693 wallet: &Wallet,
694 address: Address,
695 fee_rate_override: Option<FeeRate>,
696 ) -> anyhow::Result<Psbt, ExitError> {
697 let guard = self.inner.read().await;
698
699 let tip = guard.chain_source.tip().await
700 .map_err(|e| ExitError::TipRetrievalFailure { error: e.to_string() })?;
701
702 if inputs.is_empty() {
703 return Err(ExitError::ClaimMissingInputs);
704 }
705 let mut vtxos = HashMap::with_capacity(inputs.len());
706 for input in inputs {
707 let i = input.borrow();
708 let vtxo = i.get_full_vtxo(&*guard.persister).await?;
709 vtxos.insert(i.id(), vtxo);
710 }
711
712 let mut tx = {
713 let mut output_amount = Amount::ZERO;
714 let mut tx_ins = Vec::with_capacity(inputs.len());
715 for input in inputs {
716 let input = input.borrow();
717 let vtxo = &vtxos[&input.id()];
718 if !matches!(input.state(), ExitState::Claimable(..)) {
719 return Err(ExitError::VtxoNotClaimable { vtxo: input.id() });
720 }
721
722 output_amount += vtxo.amount();
723
724 let clause = wallet.find_signable_clause(vtxo).await
725 .ok_or(ExitError::ClaimMissingSignableClause { vtxo: vtxo.id() })?;
726
727 tx_ins.push(TxIn {
728 previous_output: vtxo.point(),
729 script_sig: ScriptBuf::default(),
730 sequence: clause.sequence().unwrap_or(Sequence::ZERO),
731 witness: Witness::new(),
732 });
733 }
734
735 let locktime = bitcoin::absolute::LockTime::from_height(tip)
736 .map_err(|e| ExitError::InvalidLocktime { tip, error: e.to_string() })?;
737
738 Transaction {
739 version: bitcoin::transaction::Version(3),
740 lock_time: locktime,
741 input: tx_ins,
742 output: vec![
743 TxOut {
744 script_pubkey: address.script_pubkey(),
745 value: output_amount,
746 },
747 ],
748 }
749 };
750
751 let create_psbt = |tx: Transaction| async {
753 let mut psbt = Psbt::from_unsigned_tx(tx)
754 .map_err(|e| ExitError::InternalError {
755 error: format!("Failed to create exit claim PSBT: {}", e),
756 })?;
757 psbt.inputs.iter_mut().zip(inputs).for_each(|(i, e)| {
758 let v = &vtxos[&e.borrow().id()];
759 i.set_exit_claim_input(v);
760 i.witness_utxo = Some(v.txout())
761 });
762 guard.sign_exit_claim_inputs(&mut psbt, wallet).await
763 .map_err(|e| ExitError::ClaimSigningError { error: e.to_string() })?;
764 Ok(psbt)
765 };
766 let fee_amount = {
767 let fee_rate = fee_rate_override
768 .unwrap_or(guard.chain_source.fee_rates().await.regular);
769 fee_rate * create_psbt(tx.clone()).await?
770 .extract_tx()
771 .map_err(|e| ExitError::InternalError {
772 error: format!("Failed to get tx from signed exit claim PSBT: {}", e),
773 })?
774 .weight()
775 };
776
777 let needed = fee_amount + P2TR_DUST;
779 if needed > tx.output[0].value {
780 return Err(ExitError::ClaimFeeExceedsOutput {
781 needed, output: tx.output[0].value,
782 });
783 }
784 tx.output[0].value -= fee_amount;
785
786 create_psbt(tx).await
788 }
789}
790