import re
import os
import sys
import json
import mmap
import stat
import time
import shutil
import string
import subprocess
from nose import tools
from nose.tools import assert_equal
from nvme_test_logger import TestNVMeLogger
class TestNVMe(object):
def __init__(self):
self.ctrl = "XXX"
self.ns1 = "XXX"
self.test_log_dir = "XXX"
self.default_nsid = 0x1
self.config_file = 'config.json'
self.load_config()
self.validate_pci_device()
def __del__(self):
if self.clear_log_dir is True:
shutil.rmtree(self.log_dir, ignore_errors=True)
@tools.nottest
def validate_pci_device(self):
x1, x2, dev = self.ctrl.split('/')
cmd = cmd = "find /sys/devices -name \\*" + dev + " | grep -i pci"
err = subprocess.call(cmd, shell=True)
assert_equal(err, 0, "ERROR : Only NVMe PCI subsystem is supported")
@tools.nottest
def load_config(self):
with open(self.config_file) as data_file:
config = json.load(data_file)
self.ctrl = config['controller']
self.ns1 = config['ns1']
self.log_dir = config['log_dir']
self.clear_log_dir = False
if self.clear_log_dir is True:
shutil.rmtree(self.log_dir, ignore_errors=True)
if not os.path.exists(self.log_dir):
os.makedirs(self.log_dir)
@tools.nottest
def setup_log_dir(self, test_name):
self.test_log_dir = self.log_dir + "/" + test_name
if not os.path.exists(self.test_log_dir):
os.makedirs(self.test_log_dir)
sys.stdout = TestNVMeLogger(self.test_log_dir + "/" + "stdout.log")
sys.stderr = TestNVMeLogger(self.test_log_dir + "/" + "stderr.log")
@tools.nottest
def exec_cmd(self, cmd):
proc = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE)
return proc.wait()
@tools.nottest
def nvme_reset_ctrl(self):
nvme_reset_cmd = "nvme reset " + self.ctrl
err = subprocess.call(nvme_reset_cmd,
shell=True,
stdout=subprocess.PIPE)
assert_equal(err, 0, "ERROR : nvme reset failed")
time.sleep(5)
rescan_cmd = "echo 1 > /sys/bus/pci/rescan"
proc = subprocess.Popen(rescan_cmd,
shell=True,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE)
time.sleep(5)
assert_equal(proc.wait(), 0, "ERROR : pci rescan failed")
@tools.nottest
def get_ctrl_id(self):
get_ctrl_id = "nvme list-ctrl " + self.ctrl
proc = subprocess.Popen(get_ctrl_id,
shell=True,
stdout=subprocess.PIPE)
err = proc.wait()
assert_equal(err, 0, "ERROR : nvme list-ctrl failed")
line = proc.stdout.readline()
ctrl_id = line.split(":")[1].strip()
return ctrl_id
@tools.nottest
def get_ns_list(self):
ns_list = []
ns_list_cmd = "nvme list-ns " + self.ctrl
proc = subprocess.Popen(ns_list_cmd,
shell=True,
stdout=subprocess.PIPE)
assert_equal(proc.wait(), 0, "ERROR : nvme list namespace failed")
for line in proc.stdout:
ns_list.append(string.replace(line.split(":")[1], '\n', ''))
return ns_list
@tools.nottest
def get_max_ns(self):
pattern = re.compile("^nn[ ]+: [0-9]", re.IGNORECASE)
max_ns = -1
max_ns_cmd = "nvme id-ctrl " + self.ctrl
proc = subprocess.Popen(max_ns_cmd,
shell=True,
stdout=subprocess.PIPE)
err = proc.wait()
assert_equal(err, 0, "ERROR : reading maximum namespace count failed")
for line in proc.stdout:
if pattern.match(line):
max_ns = line.split(":")[1].strip()
break
print(max_ns)
return int(max_ns)
@tools.nottest
def get_ncap(self):
pattern = re.compile("^tnvmcap[ ]+: [0-9]", re.IGNORECASE)
ncap = -1
ncap_cmd = "nvme id-ctrl " + self.ctrl
proc = subprocess.Popen(ncap_cmd,
shell=True,
stdout=subprocess.PIPE)
err = proc.wait()
assert_equal(err, 0, "ERROR : reading nvm capacity failed")
for line in proc.stdout:
if pattern.match(line):
ncap = line.split(":")[1].strip()
break
print(ncap)
return int(ncap)
@tools.nottest
def get_format(self):
nvm_format = 4096
nvm_format_cmd = "nvme id-ns " + self.ctrl + " -n1"
proc = subprocess.Popen(nvm_format_cmd,
shell=True,
stdout=subprocess.PIPE)
err = proc.wait()
assert_equal(err, 0, "ERROR : reading nvm capacity failed")
for line in proc.stdout:
if "in use" in line:
nvm_format = 2 ** int(line.split(":")[3].split()[0])
print(nvm_format)
return int(nvm_format)
@tools.nottest
def delete_all_ns(self):
delete_ns_cmd = "nvme delete-ns " + self.ctrl + " -n 0xFFFFFFFF"
assert_equal(self.exec_cmd(delete_ns_cmd), 0)
list_ns_cmd = "nvme list-ns " + self.ctrl + " --all | wc -l"
proc = subprocess.Popen(list_ns_cmd,
shell=True,
stdout=subprocess.PIPE)
output = proc.stdout.read().strip()
assert_equal(output, '0', "ERROR : deleting all namespace failed")
@tools.nottest
def create_ns(self, nsze, ncap, flbas, dps):
create_ns_cmd = "nvme create-ns " + self.ctrl + " --nsze=" + \
str(nsze) + " --ncap=" + str(ncap) + \
" --flbas=" + str(flbas) + " --dps=" + str(dps)
return self.exec_cmd(create_ns_cmd)
@tools.nottest
def create_and_validate_ns(self, nsid, nsze, ncap, flbas, dps):
err = self.create_ns(nsze, ncap, flbas, dps)
if err == 0:
time.sleep(2)
id_ns_cmd = "nvme id-ns " + self.ctrl + " -n " + str(nsid)
err = subprocess.call(id_ns_cmd,
shell=True,
stdout=subprocess.PIPE)
return err
@tools.nottest
def attach_ns(self, ctrl_id, ns_id):
attach_ns_cmd = "nvme attach-ns " + self.ctrl + \
" --namespace-id=" + str(ns_id) + \
" --controllers=" + ctrl_id
err = subprocess.call(attach_ns_cmd,
shell=True,
stdout=subprocess.PIPE)
time.sleep(5)
if err == 0:
self.nvme_reset_ctrl()
time.sleep(5)
err = 0 if stat.S_ISBLK(os.stat(self.ns1).st_mode) else 1
return err
@tools.nottest
def detach_ns(self, ctrl_id, nsid):
detach_ns_cmd = "nvme detach-ns " + self.ctrl + \
" --namespace-id=" + str(nsid) + \
" --controllers=" + ctrl_id
return subprocess.call(detach_ns_cmd,
shell=True,
stdout=subprocess.PIPE)
@tools.nottest
def delete_and_validate_ns(self, nsid):
delete_ns_cmd = "nvme delete-ns " + self.ctrl + " -n " + str(nsid)
err = subprocess.call(delete_ns_cmd,
shell=True,
stdout=subprocess.PIPE)
assert_equal(err, 0, "ERROR : delete namespace failed")
return err
def get_smart_log(self, nsid):
smart_log_cmd = "nvme smart-log " + self.ctrl + " -n " + str(nsid)
print(smart_log_cmd)
proc = subprocess.Popen(smart_log_cmd,
shell=True,
stdout=subprocess.PIPE)
err = proc.wait()
assert_equal(err, 0, "ERROR : nvme smart log failed")
for line in proc.stdout:
if "data_units_read" in line:
data_units_read = \
string.replace(line.split(":")[1].strip(), ",", "")
if "data_units_written" in line:
data_units_written = \
string.replace(line.split(":")[1].strip(), ",", "")
if "host_read_commands" in line:
host_read_commands = \
string.replace(line.split(":")[1].strip(), ",", "")
if "host_write_commands" in line:
host_write_commands = \
string.replace(line.split(":")[1].strip(), ",", "")
print("data_units_read " + data_units_read)
print("data_units_written " + data_units_written)
print("host_read_commands " + host_read_commands)
print("host_write_commands " + host_write_commands)
return err
def get_id_ctrl(self, vendor=False):
if not vendor:
id_ctrl_cmd = "nvme id-ctrl " + self.ctrl
else:
id_ctrl_cmd = "nvme id-ctrl -v " + self.ctrl
print(id_ctrl_cmd)
proc = subprocess.Popen(id_ctrl_cmd,
shell=True,
stdout=subprocess.PIPE)
err = proc.wait()
assert_equal(err, 0, "ERROR : nvme id controller failed")
return err
def get_error_log(self):
pattern = re.compile("^ Entry\[[ ]*[0-9]+\]")
error_log_cmd = "nvme error-log " + self.ctrl
proc = subprocess.Popen(error_log_cmd,
shell=True,
stdout=subprocess.PIPE)
err = proc.wait()
assert_equal(err, 0, "ERROR : nvme error log failed")
line = proc.stdout.readline()
err_log_entry_count = int(line.split(" ")[5].strip().split(":")[1])
entry_count = 0
for line in proc.stdout:
if pattern.match(line):
entry_count += 1
return 0 if err_log_entry_count == entry_count else 1
def run_ns_io(self, nsid, lbads):
block_size = mmap.PAGESIZE if lbads < 9 else 2 ** int(lbads)
ns_path = self.ctrl + "n" + str(nsid)
io_cmd = "dd if=" + ns_path + " of=/dev/null" + " bs=" + \
str(block_size) + " count=10 > /dev/null 2>&1"
print(io_cmd)
run_io = subprocess.Popen(io_cmd, shell=True, stdout=subprocess.PIPE)
run_io_result = run_io.communicate()[1]
assert_equal(run_io_result, None)
io_cmd = "dd if=/dev/zero of=" + ns_path + " bs=" + \
str(block_size) + " count=10 > /dev/null 2>&1"
print(io_cmd)
run_io = subprocess.Popen(io_cmd, shell=True, stdout=subprocess.PIPE)
run_io_result = run_io.communicate()[1]
assert_equal(run_io_result, None)