1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
1001
1002
1003
1004
1005
1006
1007
1008
1009
1010
1011
1012
1013
1014
1015
1016
1017
1018
1019
1020
1021
1022
1023
1024
1025
1026
1027
1028
1029
1030
1031
1032
1033
1034
1035
1036
1037
1038
1039
1040
1041
1042
1043
1044
1045
1046
1047
1048
1049
1050
1051
1052
1053
1054
1055
1056
1057
1058
1059
1060
1061
1062
1063
1064
1065
1066
1067
1068
1069
1070
1071
1072
1073
1074
1075
1076
1077
1078
1079
1080
1081
1082
1083
1084
1085
1086
1087
1088
1089
1090
1091
1092
1093
1094
1095
1096
1097
1098
1099
1100
1101
1102
1103
1104
1105
1106
1107
1108
1109
1110
1111
1112
1113
1114
1115
1116
1117
1118
1119
1120
1121
1122
1123
1124
1125
1126
1127
1128
1129
1130
1131
1132
1133
1134
1135
1136
1137
1138
1139
1140
1141
1142
1143
1144
1145
1146
1147
1148
1149
1150
1151
1152
1153
1154
1155
1156
1157
1158
1159
1160
1161
1162
1163
1164
1165
1166
1167
1168
1169
1170
1171
1172
1173
1174
1175
1176
1177
1178
1179
1180
1181
1182
1183
1184
1185
1186
1187
1188
1189
1190
1191
1192
1193
1194
1195
1196
1197
1198
1199
1200
1201
1202
1203
1204
1205
1206
1207
1208
1209
1210
1211
1212
1213
1214
1215
1216
1217
1218
1219
1220
1221
1222
1223
1224
1225
1226
1227
1228
1229
1230
1231
1232
1233
1234
1235
1236
1237
1238
1239
1240
1241
1242
1243
1244
1245
1246
1247
1248
1249
1250
1251
1252
1253
1254
1255
1256
1257
1258
1259
1260
1261
1262
1263
1264
1265
1266
1267
1268
1269
1270
1271
1272
1273
1274
1275
1276
1277
1278
1279
1280
1281
1282
1283
1284
1285
1286
1287
1288
1289
1290
1291
1292
1293
1294
1295
1296
1297
1298
1299
1300
1301
1302
1303
1304
1305
1306
1307
1308
1309
1310
1311
1312
1313
1314
1315
1316
1317
1318
1319
1320
1321
1322
1323
1324
1325
1326
1327
1328
1329
1330
1331
1332
1333
1334
1335
1336
1337
1338
1339
1340
1341
1342
1343
1344
1345
// Copyright (c) 2026 Kirky.X
//
// Licensed under the MIT License
// See LICENSE file in the project root for full license information.
//! 连接池管理模块
//!
//! 提供数据库连接池的创建、管理和自动修正功能
use async_trait::async_trait;
#[cfg(feature = "permission")]
use lru::LruCache;
#[cfg(feature = "permission")]
use std::num::NonZeroUsize;
use std::sync::Arc;
use std::sync::atomic::{AtomicU32, AtomicU64, Ordering};
use std::time::Duration;
#[cfg(feature = "permission")]
use tokio::sync::RwLock as AsyncRwLock;
use tokio::sync::{Mutex as AsyncMutex, Notify};
#[cfg(feature = "pool-health-check")]
use tokio::time::interval;
use tokio::time::timeout;
use tracing::info;
use super::Session;
#[cfg(feature = "permission")]
use crate::RolePolicy;
use crate::config::{ConfigError, DbConfig, DbConfigBuilder};
use crate::error::{DbError, DbResult};
#[cfg(feature = "metrics")]
use crate::metrics::MetricsCollector;
#[cfg(feature = "permission-engine")]
use crate::permission_engine::PermissionProvider;
#[cfg(feature = "permission")]
use crate::{SimplePermissionConfig, SimplePermissionProvider};
// 导入 Sea-ORM 的连接 trait
use sea_orm::ConnectionTrait;
/// 数据库连接类型( crate 内部使用,不对外暴露)
pub(crate) type DatabaseConnection = sea_orm::DatabaseConnection;
/// 连接池管理器
#[derive(Clone)]
pub struct DbPool {
/// 内部连接池
inner: Arc<DbPoolInner>,
}
/// DbPoolInner 结构体
///
/// 包含连接池的所有内部状态
pub(crate) struct DbPoolInner {
/// 配置
pub(crate) config: DbConfig,
/// 空闲连接队列
idle_connections: AsyncMutex<Vec<DatabaseConnection>>,
/// 连接可用通知(替代忙等待)
connection_available: Notify,
/// 连接计数器(组合计数器:高32位为total_count,低32位为active_count)
/// 这样可以确保 total_count 和 active_count 的更新是原子的
connection_counts: AtomicU64,
/// 活跃连接数(保留用于向后兼容,已废弃)
#[deprecated(note = "Use connection_counts instead")]
pub(super) active_count: AtomicU32,
/// 总连接数(保留用于向后兼容,已废弃)
#[deprecated(note = "Use connection_counts instead")]
pub(super) total_count: AtomicU32,
/// 权限策略 LRU 缓存(使用 tokio 异步读写锁)
#[cfg(feature = "permission")]
pub(crate) policy_cache: Arc<AsyncRwLock<LruCache<String, RolePolicy>>>,
/// 权限提供者(懒加载)
#[cfg(feature = "permission")]
permission_provider: Arc<AsyncMutex<Option<Arc<dyn PermissionProvider + Send + Sync>>>>,
/// 后台健康检查任务(用于优雅关闭)
health_check_shutdown: Arc<Notify>,
/// 管理员角色名称
pub(super) admin_role: String,
/// 指标收集器(可选,用于 metrics 特性)
#[cfg(feature = "metrics")]
pub(crate) metrics_collector: Option<Arc<MetricsCollector>>,
/// 等待计数
pub(super) wait_count: AtomicU32,
/// 借用计数
pub(super) borrow_count: AtomicU64,
/// 最大活跃连接数
pub(super) max_active: AtomicU32,
}
impl DbPoolInner {
/// 获取总连接数(从组合计数器中提取)
pub(super) fn get_total_count(&self) -> u32 {
let counts = self.connection_counts.load(Ordering::SeqCst);
(counts >> 32) as u32
}
/// 获取活跃连接数(从组合计数器中提取)
pub(super) fn get_active_count(&self) -> u32 {
let counts = self.connection_counts.load(Ordering::SeqCst);
(counts & 0xFFFFFFFF) as u32
}
/// 原子性地增加总连接数和活跃连接数
pub(super) fn increment_counts(&self) {
let counts = self.connection_counts.load(Ordering::SeqCst);
let total = (counts >> 32) as u32 + 1;
let active = (counts & 0xFFFFFFFF) as u32 + 1;
let new_counts = ((total as u64) << 32) | (active as u64);
self.connection_counts.store(new_counts, Ordering::SeqCst);
}
/// 原子性地减少活跃连接数
pub(super) fn decrement_active_count(&self) {
let counts = self.connection_counts.load(Ordering::SeqCst);
let total = (counts >> 32) as u32;
let active = (counts & 0xFFFFFFFF) as u32;
// 防止下溢
let new_active = if active > 0 { active - 1 } else { 0 };
let new_counts = ((total as u64) << 32) | (new_active as u64);
self.connection_counts.store(new_counts, Ordering::SeqCst);
}
/// 原子性地减少总连接数
pub(super) fn decrement_total_count(&self) {
let counts = self.connection_counts.load(Ordering::SeqCst);
let total = (counts >> 32) as u32;
let active = (counts & 0xFFFFFFFF) as u32;
// 防止下溢
let new_total = if total > 0 { total - 1 } else { 0 };
let new_counts = ((new_total as u64) << 32) | (active as u64);
self.connection_counts.store(new_counts, Ordering::SeqCst);
}
/// 获取权限提供者(如果存在)
#[cfg(feature = "permission")]
pub(crate) fn get_permission_provider(&self) -> Option<Arc<dyn PermissionProvider + Send + Sync>> {
self.permission_provider.try_lock().ok().and_then(|p| p.clone())
}
/// 获取并移除权限提供者(用于初始化 Session)
#[cfg(feature = "permission")]
pub(crate) fn take_permission_provider(&self) -> Option<Arc<dyn PermissionProvider + Send + Sync>> {
self.permission_provider.try_lock().ok().and_then(|mut p| p.take())
}
}
impl DbPool {
fn update_max_active(&self, active: u32) {
let mut current = self.inner.max_active.load(Ordering::Relaxed);
while active > current {
match self
.inner
.max_active
.compare_exchange(current, active, Ordering::SeqCst, Ordering::Relaxed)
{
Ok(_) => return,
Err(observed) => current = observed,
}
}
}
// 计数器原子性辅助方法
// 使用 AtomicU64 的高32位存储 total_count,低32位存储 active_count
/// 获取总连接数
fn get_total_count(&self) -> u32 {
(self.inner.connection_counts.load(Ordering::SeqCst) >> 32) as u32
}
/// 获取活跃连接数
fn get_active_count(&self) -> u32 {
(self.inner.connection_counts.load(Ordering::SeqCst) & 0xFFFFFFFF) as u32
}
/// 原子性地增加 total_count 和 active_count
fn increment_counts(&self) {
loop {
let current = self.inner.connection_counts.load(Ordering::SeqCst);
let total = (current >> 32) as u32 + 1;
let active = (current & 0xFFFFFFFF) as u32 + 1;
let new_count = ((total as u64) << 32) | (active as u64);
match self
.inner
.connection_counts
.compare_exchange(current, new_count, Ordering::SeqCst, Ordering::SeqCst)
{
Ok(_) => break,
Err(_) => continue,
}
}
}
/// 原子性地减少 active_count
fn decrement_active_count(&self) {
loop {
let current = self.inner.connection_counts.load(Ordering::SeqCst);
let total = (current >> 32) as u32;
let active = (current & 0xFFFFFFFF) as u32;
if active == 0 {
// active_count 不应该小于 0,但为了安全起见,我们不做任何操作
break;
}
let new_count = ((total as u64) << 32) | ((active - 1) as u64);
match self
.inner
.connection_counts
.compare_exchange(current, new_count, Ordering::SeqCst, Ordering::SeqCst)
{
Ok(_) => break,
Err(_) => continue,
}
}
}
/// 原子性地减少 total_count
fn decrement_total_count(&self) {
loop {
let current = self.inner.connection_counts.load(Ordering::SeqCst);
let total = (current >> 32) as u32;
let active = (current & 0xFFFFFFFF) as u32;
if total == 0 {
// total_count 不应该小于 0,但为了安全起见,我们不做任何操作
break;
}
let new_count = (((total - 1) as u64) << 32) | (active as u64);
match self
.inner
.connection_counts
.compare_exchange(current, new_count, Ordering::SeqCst, Ordering::SeqCst)
{
Ok(_) => break,
Err(_) => continue,
}
}
}
/// 创建新的连接池
///
/// # Arguments
///
/// * `url` - 数据库连接 URL
///
/// # Errors
///
/// 如果 URL 格式无效或不支持,返回错误
///
/// # Example
///
/// ```ignore
/// use dbnexus::DbPool;
///
/// #[tokio::main]
/// async fn main() -> Result<(), Box<dyn std::error::Error>> {
/// let pool = DbPool::new("sqlite://example.db").await?;
/// Ok(())
/// }
/// ```
pub async fn new(url: &str) -> DbResult<Self> {
// 使用构建器创建带默认值的配置
let config = DbConfigBuilder::new()
.url(url)
.max_connections(20)
.min_connections(5)
.idle_timeout(300)
.acquire_timeout(5000)
.admin_role("admin")
.build()
.map_err(|e| sea_orm::DbErr::Custom(format!("Config validation failed: {:?}", e)))?;
Self::with_config(config).await
}
/// 使用配置创建连接池(带自动修正)
pub async fn with_config(config: DbConfig) -> DbResult<Self> {
// 使用配置修正器自动修正配置
let corrected_config = crate::config::ConfigCorrector::auto_correct(config);
// 创建初始连接以查询数据库能力
let db_type = crate::config::DatabaseType::parse_database_type(&corrected_config.url_sanitized());
// 创建连接并应用数据库能力修正
let connection = sea_orm::Database::connect(corrected_config.url_for_connection())
.await
.map_err(DbError::Connection)?;
// 应用数据库能力修正(如果需要)
let corrected_config = crate::config::ConfigCorrector::auto_correct_with_database_capability(
corrected_config,
&connection,
db_type, // DatabaseType implements Copy, no need to clone
)
.await;
// 输出配置修正信息
if corrected_config.max_connections() < 100 && db_type.is_real_database() {
info!(
"Database connection limit: 80% of {} = {} connections",
corrected_config.max_connections(),
corrected_config.max_connections()
);
}
#[cfg(feature = "permission")]
let policy_cache = Arc::new(AsyncRwLock::new(LruCache::new(
NonZeroUsize::new(4096).expect("LRU cache size must be non-zero"),
)));
// 加载权限提供者(如果指定了路径)
#[cfg(feature = "permission")]
let permission_provider = Self::load_permission_provider(&corrected_config).await;
#[cfg(not(feature = "permission"))]
let permission_provider = None;
let pool = Self {
inner: Arc::new(DbPoolInner {
config: corrected_config.clone(),
idle_connections: AsyncMutex::new(Vec::new()),
connection_available: Notify::new(),
connection_counts: AtomicU64::new(0),
active_count: AtomicU32::new(0),
total_count: AtomicU32::new(0),
#[cfg(feature = "permission")]
policy_cache,
#[cfg(feature = "permission")]
permission_provider: Arc::new(AsyncMutex::new(permission_provider)),
health_check_shutdown: Arc::new(Notify::new()),
admin_role: corrected_config.admin_role().to_string(),
#[cfg(feature = "metrics")]
metrics_collector: None,
wait_count: AtomicU32::new(0),
borrow_count: AtomicU64::new(0),
max_active: AtomicU32::new(0),
}),
};
// 启动后台健康检查任务
#[cfg(feature = "pool-health-check")]
pool.start_background_health_check();
// 预创建最小连接数(并行创建以提高启动速度,带超时和重试)
#[cfg(feature = "pool-warmup")]
{
let initial_connections = pool.inner.config.min_connections();
let warmup_timeout = Duration::from_secs(pool.inner.config.warmup_timeout());
let warmup_retries = pool.inner.config.warmup_retries();
let mut connection_tasks = Vec::new();
for _ in 0..initial_connections {
let config = corrected_config.clone();
connection_tasks.push(async move {
let mut retries = 0;
let mut last_error = None;
while retries <= warmup_retries {
match timeout(warmup_timeout, Self::create_connection(&config)).await {
Ok(Ok(conn)) => return Ok(conn),
Ok(Err(e)) => {
last_error = Some(e);
retries += 1;
if retries <= warmup_retries {
tokio::time::sleep(Duration::from_millis(100)).await;
}
}
Err(_) => {
last_error = Some(DbError::Connection(sea_orm::DbErr::ConnectionAcquire(
sea_orm::ConnAcquireErr::Timeout,
)));
break;
}
}
}
Err(last_error.unwrap_or_else(|| {
DbError::Connection(sea_orm::DbErr::ConnectionAcquire(sea_orm::ConnAcquireErr::Timeout))
}))
});
}
// 并行执行所有连接创建任务
let results = futures::future::join_all(connection_tasks).await;
let mut successful = 0;
let mut failed = 0;
for result in results {
match result {
Ok(conn) => {
pool.inner.idle_connections.lock().await.push(conn);
pool.inner.total_count.fetch_add(1, Ordering::SeqCst);
successful += 1;
}
Err(e) => {
tracing::error!("Failed to create initial connection: {}", e);
failed += 1;
}
}
}
info!(
"Connection pool initialized: {}/{} connections (min: {}, max: {}), {} failed",
successful,
initial_connections,
corrected_config.min_connections(),
corrected_config.max_connections(),
failed
);
}
// 加载权限策略到缓存
#[cfg(feature = "permission")]
{
let permission_provider_guard = pool.inner.permission_provider.lock().await;
if let Some(ref provider) = *permission_provider_guard {
info!("Permission provider loaded: {}", provider.name());
}
drop(permission_provider_guard);
}
#[cfg(feature = "auto-migrate")]
if corrected_config.auto_migrate() {
if let Some(migrations_dir) = corrected_config.migrations_dir() {
if migrations_dir.exists() {
info!(
"Auto-migrate enabled, running migrations from: {}",
migrations_dir.display()
);
let applied = pool.run_migrations(migrations_dir).await?;
info!("Auto-migrate completed: {} migrations applied", applied);
} else {
tracing::warn!(
"Auto-migrate enabled but migrations directory does not exist: {}",
migrations_dir.display()
);
}
} else {
tracing::warn!("Auto-migrate enabled but migrations_dir not configured");
}
}
Ok(pool)
}
/// 使用配置结构体创建连接池
///
/// 此方法接受一个 [`DbConfig`] 结构体,用于配置连接池的所有参数。
/// 与 [`Self::new`] 方法功能相同,但更适合从配置结构体直接初始化。
///
/// # Example
///
/// ```no_run
/// use dbnexus::{DbPool, DbConfigBuilder};
///
/// #[tokio::main]
/// async fn main() -> Result<(), Box<dyn std::error::Error>> {
/// let config = DbConfigBuilder::new()
/// .url("sqlite::memory:")
/// .max_connections(10)
/// .min_connections(2)
/// .build()?;
///
/// let pool = DbPool::try_from_config(config).await?;
/// Ok(())
/// }
/// ```
///
/// # Errors
///
/// 如果连接失败或配置无效,返回错误
pub async fn try_from_config(config: DbConfig) -> DbResult<Self> {
Self::with_config(config).await
}
/// 使用配置引用同步创建连接池
///
/// 此方法是同步的,不会创建数据库连接。
/// 实际的连接池创建和连接验证在首次获取连接时进行。
///
/// # Example
///
/// ```rust
/// use dbnexus::{DbPool, DbConfigBuilder};
///
/// let config = DbConfigBuilder::new()
/// .url("sqlite::memory:")
/// .max_connections(10)
/// .min_connections(1)
/// .idle_timeout(300)
/// .acquire_timeout(5000)
/// .build()?;
///
/// let pool = DbPool::try_from(&config)?;
/// # Ok::<_, Box<dyn std::error::Error>>(())
/// ```
///
/// # Errors
///
/// 如果配置验证失败,返回错误
pub fn try_from(config: &DbConfig) -> Result<Self, ConfigError> {
config.validate()?;
Ok(Self {
inner: Arc::new(DbPoolInner {
config: config.clone_config(),
idle_connections: AsyncMutex::new(Vec::new()),
connection_available: Notify::new(),
connection_counts: AtomicU64::new(0),
active_count: AtomicU32::new(0),
total_count: AtomicU32::new(0),
#[cfg(feature = "permission")]
policy_cache: Arc::new(AsyncRwLock::new(LruCache::new(
NonZeroUsize::new(4096).expect("LRU cache size must be non-zero"),
))),
#[cfg(feature = "permission")]
permission_provider: Arc::new(AsyncMutex::new(None)),
health_check_shutdown: Arc::new(Notify::new()),
admin_role: config.admin_role().to_string(),
#[cfg(feature = "metrics")]
metrics_collector: None,
wait_count: AtomicU32::new(0),
borrow_count: AtomicU64::new(0),
max_active: AtomicU32::new(0),
}),
})
}
/// 加载权限配置文件
///
/// # Returns
///
/// - `Some(Arc<dyn PermissionProvider>)` - 成功加载的权限提供者
/// - `None` - 没有配置权限文件,使用默认的 allow_all 策略
#[cfg(feature = "permission")]
async fn load_permission_provider(config: &DbConfig) -> Option<Arc<dyn PermissionProvider + Send + Sync>> {
tracing::debug!(
"load_permission_provider called, permissions_path = {:?}",
config.permissions_path()
);
// 尝试从配置文件加载
if let Some(ref path) = config.permissions_path() {
tracing::info!("Loading permission provider from: {}", path);
match tokio::fs::read_to_string(path).await {
Ok(content) => {
tracing::debug!("Read {} bytes from permission config", content.len());
// 尝试解析为 SimplePermissionConfig 格式
match serde_yaml::from_str::<SimplePermissionConfig>(&content) {
Ok(perm_config) => {
tracing::info!("Successfully loaded permission config from: {}", path);
tracing::debug!("Permission config has {} roles", perm_config.roles.len());
return Some(
Arc::new(SimplePermissionProvider::new(perm_config)) as Arc<dyn PermissionProvider>
);
}
Err(e) => {
tracing::warn!("Failed to parse permission config from '{}': {}", path, e);
return None;
}
}
}
Err(e) => {
tracing::warn!("Failed to read permission config from '{}': {}", path, e);
return None;
}
}
}
// 没有配置权限文件,使用默认配置(允许所有操作)
tracing::debug!("No permission config path specified, using default allow-all config");
Some(Arc::new(SimplePermissionProvider::allow_all()) as Arc<dyn PermissionProvider>)
}
/// 获取指标收集器(如果已设置)
#[cfg(feature = "metrics")]
pub fn metrics(&self) -> Option<&Arc<MetricsCollector>> {
self.inner.metrics_collector.as_ref()
}
/// 获取当前应用的实际配置
///
/// 返回经过自动修正后的配置副本。
/// 如果配置从未被修正过,则返回传入的配置。
///
/// # Returns
///
/// 实际应用的配置(可能已被自动修正)
pub fn get_actual_config(&self) -> DbConfig {
crate::config::ConfigCorrector::get_actual_config(&self.inner.config)
}
/// 从池中获取 Session(带 metrics 支持)
///
/// # Arguments
///
/// * `role` - 角色名称,必须在权限配置中定义
///
/// # Errors
///
/// 如果角色未在权限配置中定义,返回错误
pub async fn get_session(&self, role: &str) -> DbResult<Session> {
// 验证角色名称
#[cfg(feature = "permission")]
self.validate_role_name(role).await?;
let connection = self.acquire_connection().await?;
let pool_ref = Arc::new(self.clone());
#[allow(unused_mut)]
let mut session = Session::new(connection, pool_ref, self.inner.clone(), role.to_string());
Ok(session)
}
/// Alias for get_session() - semantic naming for role-based access
///
/// This method provides a more semantic API when working with role-based access.
/// It accepts any type that can be converted into a String, making it flexible
/// for different role representations.
///
/// # Arguments
///
/// * `role` - 角色名称,必须在权限配置中定义
///
/// # Errors
///
/// 如果角色未在权限配置中定义,返回错误
///
/// # Example
///
/// ```rust
/// use dbnexus::DbPool;
///
/// # async fn example(pool: &DbPool) -> Result<(), Box<dyn std::error::Error>> {
/// let session = pool.with_role("admin").await?;
/// // Use session...
/// # Ok(())
/// # } /// ```
pub async fn with_role(&self, role: impl Into<String>) -> DbResult<Session> {
self.get_session(&role.into()).await
}
/// Health check - returns true if pool is healthy
///
/// A simple health check that verifies the pool has available capacity.
/// Returns true if the pool has not reached maximum connections,
/// indicating it can accept new connections.
///
/// # Returns
///
/// `true` if the pool has available capacity, `false` otherwise
///
/// # Example
///
/// ```rust
/// use dbnexus::DbPool;
///
/// # async fn example(pool: &DbPool) {
/// if pool.health_check().await {
/// println!("Pool is healthy and has available capacity");
/// }
/// # }
/// ```
pub async fn health_check(&self) -> bool {
let status = self.status();
status.active < status.total && status.total > 0
}
/// 验证角色名称是否在权限配置中定义
///
/// 仅在权限配置文件存在且成功加载时验证角色。
/// 如果没有配置权限文件,使用默认策略,不进行角色验证。
#[cfg(feature = "permission")]
async fn validate_role_name(&self, role: &str) -> DbResult<()> {
// 获取权限提供者锁
let permission_provider = self.inner.permission_provider.lock().await;
// 检查权限提供者是否存在(用户是否显式配置了权限文件)
if permission_provider.is_none() {
// 没有配置权限提供者时,使用安全默认策略
// 只允许预定义的安全角色,防止未授权访问
let safe_roles = ["admin", "system"];
if !safe_roles.contains(&role) {
tracing::warn!(
"Role '{}' is not allowed without explicit permission configuration",
role
);
return Err(DbError::Permission(format!(
"Role '{}' is not allowed without explicit permission configuration. Allowed roles: {}",
role,
safe_roles.join(", ")
)));
}
tracing::debug!("No permission provider configured, allowing safe role '{}'", role);
return Ok(());
}
tracing::debug!("Permission provider present, checking role '{}'", role);
// 检查角色是否有权限访问任何资源
if let Some(ref provider) = *permission_provider {
let resources = provider.get_allowed_resources(role).await;
if resources.is_empty() {
// 角色没有定义任何权限
tracing::warn!("Unknown role '{}' requested, no permissions defined", role);
return Err(DbError::Permission(format!(
"Role '{}' is not defined in permission configuration",
role
)));
}
}
Ok(())
}
/// 创建单个数据库连接
///
/// 使用配置中的 URL 建立新的数据库连接。
/// 此方法不进行连接池管理,仅创建原始连接。
///
/// # Arguments
///
/// * `config` - 数据库配置,包含连接 URL
///
/// # Returns
///
/// 成功创建的数据库连接
///
/// # Errors
///
/// 如果连接失败,返回数据库错误
async fn create_connection(config: &DbConfig) -> DbResult<DatabaseConnection> {
let conn = sea_orm::Database::connect(config.url_for_connection()).await?;
Ok(conn)
}
/// 检查连接健康状态
///
/// 通过执行轻量级查询来验证数据库连接的有效性。
/// 使用数据库特定的健康检查查询:
/// - SQLite: `SELECT 1`
/// - PostgreSQL: `SELECT 1`
/// - MySQL: `SELECT 1`
///
/// # Arguments
///
/// * `conn` - 要检查的数据库连接
///
/// # Returns
///
/// 如果连接有效返回 `true`,否则返回 `false`
pub async fn check_connection_health(&self, conn: &DatabaseConnection) -> bool {
let health_query = Self::get_health_check_query(&self.inner.config.url_sanitized());
let backend = Self::get_database_backend(&self.inner.config.url_sanitized());
// 创建带超时的健康检查
let result = timeout(
Duration::from_secs(5),
conn.execute_raw(sea_orm::Statement::from_string(backend, health_query.to_string())),
)
.await;
match result {
Ok(Ok(_)) => {
tracing::debug!("Connection health check passed");
true
}
Ok(Err(e)) => {
tracing::warn!("Connection health check failed: {}", e);
false
}
Err(_) => {
tracing::warn!("Connection health check timed out");
false
}
}
}
/// 获取数据库类型
///
/// 根据数据库 URL 的协议部分解析数据库类型。
/// 支持的数据库类型包括 SQLite、PostgreSQL 和 MySQL。
///
/// # Arguments
///
/// * `url` - 数据库连接 URL
///
/// # Returns
///
/// 对应的 Sea-ORM 数据库后端类型
///
/// # Note
///
/// 如果 URL 无法识别,默认返回 SQLite 类型
fn get_database_backend(url: &str) -> sea_orm::DatabaseBackend {
if url.starts_with("sqlite:") {
sea_orm::DatabaseBackend::Sqlite
} else if url.starts_with("postgres:") || url.starts_with("postgresql:") {
sea_orm::DatabaseBackend::Postgres
} else if url.starts_with("mysql:") {
sea_orm::DatabaseBackend::MySql
} else {
sea_orm::DatabaseBackend::Sqlite
}
}
/// 获取健康检查查询语句
///
/// 根据数据库类型返回对应的健康检查 SQL 语句。
/// 所有支持的数据库类型都使用简单的 `SELECT 1` 查询,
/// 这是一个轻量级的查询,用于验证连接是否仍然有效。
///
/// # Arguments
///
/// * `url` - 数据库连接 URL
///
/// # Returns
///
/// 对应数据库类型的健康检查 SQL 语句
fn get_health_check_query(url: &str) -> &'static str {
match Self::get_database_backend(url) {
sea_orm::DatabaseBackend::Sqlite => "SELECT 1",
sea_orm::DatabaseBackend::Postgres => "SELECT 1",
sea_orm::DatabaseBackend::MySql => "SELECT 1",
// 处理未来可能新增的数据库类型
_ => "SELECT 1",
}
}
/// 清理无效连接
///
/// 遍历空闲连接池,验证每个连接的有效性,
/// 移除超时或断开连接的实例。
///
/// # Returns
///
/// 被移除的无效连接数量
pub async fn clean_invalid_connections(&self) -> u32 {
let mut idle = self.inner.idle_connections.lock().await;
let config = &self.inner.config;
let health_query = Self::get_health_check_query(&config.url_sanitized());
let backend = Self::get_database_backend(&config.url_sanitized());
let mut removed_count = 0;
// 保留有效连接
let mut valid_connections: Vec<DatabaseConnection> = Vec::with_capacity(idle.len());
for conn in idle.drain(..) {
// 执行健康检查(带超时)
let is_valid = timeout(
Duration::from_secs(2),
conn.execute_raw(sea_orm::Statement::from_string(backend, health_query.to_string())),
)
.await
.is_ok_and(|result| result.is_ok());
if is_valid {
valid_connections.push(conn);
} else {
removed_count += 1;
}
}
// 重建空闲连接队列
idle.extend(valid_connections);
// 更新总连接数
self.inner.total_count.fetch_sub(removed_count as u32, Ordering::SeqCst);
if removed_count > 0 {
tracing::info!(
"Cleaned {} invalid connections from pool (remaining idle: {})",
removed_count,
idle.len()
);
}
removed_count as u32
}
/// 验证并重新创建无效连接
///
/// 检查所有空闲连接的健康状态,自动替换无效连接。
/// 此方法会确保池中至少保持配置的最小连接数。
///
/// # Returns
///
/// 被重新创建的连接数量,或错误
pub async fn validate_and_recreate_connections(&self) -> Result<u32, sea_orm::DbErr> {
let mut idle = self.inner.idle_connections.lock().await;
let config = &self.inner.config;
let mut recreated_count: u32 = 0;
let health_query = Self::get_health_check_query(&config.url_sanitized());
let backend = Self::get_database_backend(&config.url_sanitized());
// 手动分区连接为有效和无效两组
let mut valid_connections: Vec<DatabaseConnection> = Vec::new();
let mut invalid_connections: Vec<DatabaseConnection> = Vec::new();
for conn in idle.drain(..) {
let is_valid = timeout(
Duration::from_secs(2),
conn.execute_raw(sea_orm::Statement::from_string(backend, health_query.to_string())),
)
.await
.is_ok_and(|result| result.is_ok());
if is_valid {
valid_connections.push(conn);
} else {
invalid_connections.push(conn);
}
}
let invalid_count = invalid_connections.len();
if invalid_count > 0 {
// 更新总连接数
self.inner.total_count.fetch_sub(invalid_count as u32, Ordering::SeqCst);
// 重建空闲队列(只保留有效连接)
idle.clear();
idle.extend(valid_connections);
tracing::warn!("Found {} invalid connections, removed from pool", invalid_count);
// 重新创建连接以维持最小连接数
let current_idle = idle.len();
let needed = config.min_connections().saturating_sub(current_idle as u32) as usize;
for _ in 0..needed {
match Self::create_connection(config).await {
Ok(new_conn) => {
idle.push(new_conn);
self.inner.total_count.fetch_add(1, Ordering::SeqCst);
recreated_count += 1;
}
Err(e) => {
tracing::error!("Failed to recreate connection: {}", e);
// 不要直接返回错误,而是记录并继续
// 已创建的有效连接已经保留在 idle 中
tracing::warn!(
"Health check completed with {} recreated connections (target: {})",
recreated_count,
needed
);
// 返回部分成功的结果,而不是完全失败
return Ok(recreated_count);
}
}
}
if recreated_count > 0 {
tracing::info!(
"Recreated {} connections to maintain minimum pool size",
recreated_count
);
}
} else {
// 没有无效连接,恢复有效连接到池中
idle.extend(valid_connections);
}
Ok(recreated_count)
}
/// 启动后台连接健康检查任务
///
/// 该任务会定期检查所有空闲连接的健康状态,
/// 自动移除无效连接并重建新连接以维持最小连接数。
///
/// 健康检查间隔默认为 30 秒,可通过环境变量 `DB_HEALTH_CHECK_INTERVAL` 配置(秒)。
#[cfg(feature = "pool-health-check")]
fn start_background_health_check(&self) {
let pool = self.clone();
let shutdown = self.inner.health_check_shutdown.clone();
// 从环境变量获取健康检查间隔,默认为 30 秒
let interval_secs = std::env::var("DB_HEALTH_CHECK_INTERVAL")
.ok()
.and_then(|v| v.parse().ok())
.unwrap_or(30);
tracing::info!(
"Starting background health check task with interval: {} seconds",
interval_secs
);
tokio::spawn(async move {
let mut interval = interval(Duration::from_secs(interval_secs));
loop {
tokio::select! {
_ = interval.tick() => {
// 执行连接健康检查
match pool.validate_and_recreate_connections().await {
Ok(recreated) => {
if recreated > 0 {
tracing::info!(
"Background health check: recreated {} connections",
recreated
);
} else {
tracing::debug!("Background health check: all connections healthy");
}
}
Err(e) => {
tracing::error!(
"Background health check failed: {}",
e
);
}
}
}
_ = shutdown.notified() => {
tracing::info!("Background health check task shutting down");
break;
}
}
}
});
}
/// 从池中获取连接
///
/// 实现连接获取逻辑,包括:
/// 1. 尝试从空闲连接队列获取
/// 2. 如果队列为空且未达到最大连接数,创建新连接
/// 3. 如果已达到最大连接数,等待其他连接释放(带超时)
///
/// 使用异步条件变量(Notify)实现高效的等待机制,避免忙等待。
///
/// # Returns
///
/// 成功获取的数据库连接
///
/// # Errors
///
/// 如果获取连接超时或创建连接失败,返回错误
async fn acquire_connection(&self) -> DbResult<DatabaseConnection> {
// 使用锁保护整个连接获取流程,避免竞争条件
let mut idle = self.inner.idle_connections.lock().await;
// 尝试从空闲队列获取
if !idle.is_empty() {
let active = self.inner.active_count.fetch_add(1, Ordering::SeqCst) + 1;
self.update_max_active(active);
self.inner.borrow_count.fetch_add(1, Ordering::SeqCst);
return idle.pop().ok_or_else(|| {
DbError::Connection(sea_orm::DbErr::ConnectionAcquire(sea_orm::ConnAcquireErr::Timeout))
});
}
// 检查是否达到最大连接数(在持有锁的情况下)
if self.inner.get_total_count() >= self.inner.config.max_connections() {
// 等待空闲连接(使用条件变量替代忙等待)
let timeout_duration = self.inner.config.acquire_timeout_duration();
self.inner.wait_count.fetch_add(1, Ordering::SeqCst);
// 释放锁并等待通知
drop(idle);
let result = timeout(timeout_duration, async {
let mut idle = self.inner.idle_connections.lock().await;
while idle.is_empty() {
// 重新注册通知(因为每次 await 后都需要重新注册)
let notified = self.inner.connection_available.notified();
drop(idle);
notified.await;
idle = self.inner.idle_connections.lock().await;
}
idle.pop()
})
.await;
return match result {
Ok(Some(conn)) => {
let active = self.inner.active_count.fetch_add(1, Ordering::SeqCst) + 1;
self.update_max_active(active);
self.inner.borrow_count.fetch_add(1, Ordering::SeqCst);
Ok(conn)
}
Ok(None) => Err(DbError::Connection(sea_orm::DbErr::ConnectionAcquire(
sea_orm::ConnAcquireErr::Timeout,
))),
Err(_) => Err(DbError::Connection(sea_orm::DbErr::ConnectionAcquire(
sea_orm::ConnAcquireErr::Timeout,
))),
};
}
// 创建新连接(在持有锁的情况下,确保不会超过最大连接数)
// 原子性地增加总连接数和活跃连接数
self.inner.increment_counts();
let active = self.inner.get_active_count();
self.update_max_active(active);
// 释放锁后再创建连接(避免阻塞其他操作)
drop(idle);
match Self::create_connection(&self.inner.config).await {
Ok(conn) => {
self.inner.borrow_count.fetch_add(1, Ordering::SeqCst);
Ok(conn)
}
Err(e) => {
// 创建失败,回滚计数
self.inner.decrement_active_count();
self.inner.decrement_total_count();
Err(e)
}
}
}
/// 归还连接到池中
///
/// 将使用完毕的连接归还到空闲连接队列。
/// 如果空闲队列已满(达到最大连接数),则丢弃该连接。
/// 归还后会通知一个等待的请求者有新连接可用。
///
/// # Arguments
///
/// * `conn` - 要归还的数据库连接
///
/// # Note
///
/// 此方法在 tokio 环境下使用 tokio::spawn 在后台执行,
/// 在非 tokio 环境下使用阻塞等待,避免连接泄漏。
pub(crate) fn release_connection(&self, conn: DatabaseConnection) {
self.inner.decrement_active_count();
let inner = self.inner.clone();
// 尝试立即获取锁(非阻塞)
if let Ok(mut idle) = inner.idle_connections.try_lock() {
if idle.len() < inner.config.max_connections() as usize {
idle.push(conn);
inner.connection_available.notify_one();
} else {
inner.decrement_total_count();
drop(conn); // 连接池已满,关闭连接
}
return;
}
// 如果在 tokio runtime 中,使用异步路径
if tokio::runtime::Handle::try_current().is_ok() {
tokio::spawn(async move {
let mut idle = inner.idle_connections.lock().await;
if idle.len() < inner.config.max_connections() as usize {
idle.push(conn);
inner.connection_available.notify_one();
} else {
inner.decrement_total_count();
drop(conn); // 连接池已满,关闭连接
}
});
} else {
// 非 tokio 环境:使用阻塞等待获取锁
// 这防止了在 Session Drop 时的连接泄漏
let mut idle = inner.idle_connections.blocking_lock();
if idle.len() < inner.config.max_connections() as usize {
idle.push(conn);
inner.connection_available.notify_one();
} else {
inner.decrement_total_count();
drop(conn); // 连接池已满,关闭连接
}
}
}
/// 获取连接池状态
///
/// 返回当前连接池的统计信息,包括总连接数、活跃连接数和空闲连接数。
///
/// # Returns
///
/// 连接池状态信息
///
/// # Example
///
/// ```rust
/// use dbnexus::DbPool;
///
/// # async fn example(pool: &DbPool) {
/// let status = pool.status();
/// println!("Total: {}, Active: {}, Idle: {}",
/// status.total, status.active, status.idle);
/// # }
/// ```
pub fn status(&self) -> PoolStatus {
let total = self.inner.get_total_count();
let active = self.inner.get_active_count();
let wait_count = self.inner.wait_count.load(Ordering::SeqCst);
let borrow_count = self.inner.borrow_count.load(Ordering::SeqCst);
let max_active = self.inner.max_active.load(Ordering::SeqCst);
PoolStatus {
total,
active,
idle: total.saturating_sub(active),
wait_count,
borrow_count,
max_active,
}
}
/// 获取配置
///
/// 返回连接池的配置引用。
///
/// # Returns
///
/// 连接池配置的引用
///
/// # Example
///
/// ```rust
/// use dbnexus::DbPool;
///
/// # async fn example(pool: &DbPool) {
/// let config = pool.config();
/// println!("Max connections: {}", config.max_connections());
/// # }
/// ```
pub fn config(&self) -> &DbConfig {
&self.inner.config
}
/// 运行自动迁移
///
/// 如果配置中启用了 `auto_migrate`,此方法会在连接池创建后自动执行迁移。
/// 也可以手动调用此方法来执行迁移。
///
/// # Returns
///
/// 成功应用的迁移数量
#[cfg(feature = "auto-migrate")]
pub async fn run_auto_migrate(&self) -> Result<u32, DbError> {
if let Some(migrations_dir) = self.inner.config.migrations_dir() {
tracing::info!("Running auto-migrate from directory: {}", migrations_dir.display());
self.run_migrations(migrations_dir).await
} else {
tracing::warn!("Auto-migrate enabled but migrations_dir not configured");
Ok(0)
}
}
/// 手动运行迁移
///
/// # Arguments
///
/// * `migrations_dir` - 迁移文件目录路径
///
/// # Returns
///
/// 成功应用的迁移数量
#[cfg(feature = "auto-migrate")]
pub async fn run_migrations(&self, migrations_dir: &std::path::Path) -> Result<u32, DbError> {
use crate::migration::MigrationExecutor;
let db_type = crate::DatabaseType::parse_database_type(&self.inner.config.url_sanitized());
// 获取一个连接来执行迁移
let connection = self.acquire_connection().await?;
// 克隆连接,因为执行器需要拥有连接
let connection_for_migration = connection.clone();
let mut executor = MigrationExecutor::new(connection_for_migration, db_type);
let applied = executor.run_migrations(migrations_dir).await?;
// 归还连接到池中
self.release_connection(connection);
Ok(applied)
}
}
/// DbPool 的优雅关闭
impl Drop for DbPool {
fn drop(&mut self) {
// 通知后台健康检查任务关闭
self.inner.health_check_shutdown.notify_one();
tracing::info!("DbPool dropped, shutdown signal sent to background health check task");
}
}
/// 连接池状态
#[derive(Debug, Clone)]
pub struct PoolStatus {
/// 总连接数
pub total: u32,
/// 活跃连接数
pub active: u32,
/// 空闲连接数
pub idle: u32,
/// 等待连接的请求数
pub wait_count: u32,
/// 借用次数
pub borrow_count: u64,
/// 最大活跃连接数(历史峰值)
pub max_active: u32,
}
// 实现 ConnectionPool trait
#[async_trait]
impl super::ConnectionPool for DbPool {
async fn get_session(&self, role: &str) -> DbResult<Session> {
self.get_session(role).await
}
fn status(&self) -> PoolStatus {
self.status()
}
fn config(&self) -> &DbConfig {
self.config()
}
}